websockets.js 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. 'use strict';
  2. const db = require('../models/db');
  3. const Sequelize = require('sequelize');
  4. const Op = Sequelize.Op;
  5. const config = require('config');
  6. const WebSocketServer = require('ws').Server;
  7. //const RedisConnection = require('ioredis');
  8. const async = require('async');
  9. const _ = require("underscore");
  10. const crypto = require('crypto');
  11. const redisMock = require("./redis.js");
  12. module.exports = {
  13. startWebsockets: function(server) {
  14. this.setupSubscription();
  15. if (!this.current_websockets) {
  16. if (config.get("redis_mock")) {
  17. this.state = redisMock.getConnection();
  18. } else {
  19. this.state = new RedisConnection(6379, process.env.REDIS_PORT_6379_TCP_ADDR || config.get("redis_host"));
  20. }
  21. this.current_websockets = [];
  22. }
  23. const wss = new WebSocketServer({ server:server, path: "/socket" });
  24. wss.on('connection', function(ws) {
  25. this.state.incr("socket_id", function(err, socketCounter) {
  26. const socketId = "socket_" + socketCounter + "_" + crypto.randomBytes(64).toString('hex').substring(0,8);
  27. const serverScope = this;
  28. ws.on('message', function(msgString){
  29. const socket = this;
  30. const msg = JSON.parse(msgString);
  31. if(msg.action == "auth"){
  32. const token = msg.auth_token;
  33. const editorName = msg.editor_name;
  34. const editorAuth = msg.editor_auth;
  35. const spaceId = msg.space_id;
  36. db.Space.findOne({where: {"_id": spaceId}}).then(space => {
  37. if (space) {
  38. const upgradeSocket = function() {
  39. if (token) {
  40. db.findUserBySessionToken(token, function(err, user) {
  41. if (err) {
  42. console.error(err, user);
  43. } else {
  44. if (user) {
  45. serverScope.addUserInSpace(user._id, space, ws, function(err){
  46. serverScope.addLocalUser(user._id, ws);
  47. console.log("[websockets] user " + user.email + " online in space " + space._id);
  48. });
  49. }
  50. }
  51. });
  52. } else {
  53. const anonymousUserId = space._id + "-" + editorName;
  54. if(space.access_mode == "private" && space.edit_hash != editorAuth){
  55. console.error("closing websocket: unauthed.");
  56. ws.send(JSON.stringify({error: "auth_failed"}));
  57. // ws.close();
  58. return;
  59. }
  60. serverScope.addUserInSpace(anonymousUserId, space, ws, function(err){
  61. serverScope.addLocalUser(anonymousUserId, ws);
  62. console.log("[websockets] anonymous user " + anonymousUserId + " online in space " + space._id);
  63. });
  64. }
  65. };
  66. if (!ws.id) {
  67. ws['id'] = socketId;
  68. try {
  69. ws.send(JSON.stringify({"action": "init", "channel_id": socketId}));
  70. } catch (e) {
  71. console.log("ws.send error: "+e);
  72. }
  73. }
  74. if (ws.space_id) {
  75. serverScope.removeUserInSpace(ws.space_id, ws, function(err) {
  76. upgradeSocket();
  77. });
  78. } else {
  79. upgradeSocket();
  80. }
  81. } else {
  82. ws.send(JSON.stringify({error: "space not found"}));
  83. ws.close();
  84. return;
  85. }
  86. });
  87. } else if (msg.action == "cursor" || msg.action == "viewport" || msg.action=="media") {
  88. msg.space_id = socket.space_id;
  89. msg.from_socket_id = socket.id;
  90. serverScope.state.publish('cursors', JSON.stringify(msg));
  91. }
  92. });
  93. ws.on('close', function(evt) {
  94. console.log("websocket closed: ", ws.id, ws.space_id);
  95. const spaceId = ws.space_id;
  96. serverScope.removeUserInSpace(spaceId, ws, function(err) {
  97. this.removeLocalUser(ws, function(err) {
  98. }.bind(this));
  99. }.bind(this));
  100. }.bind(this));
  101. ws.on('error', function(ws, err) {
  102. console.error(err, res);
  103. }.bind(this));
  104. }.bind(this));
  105. }.bind(this));
  106. },
  107. setupSubscription: function() {
  108. if (config.get("redis_mock")) {
  109. this.cursorSubscriber = redisMock.getConnection().subscribe(['cursors', 'users', 'updates'], function (err, count) {
  110. console.log("[redis-mock] websockets subscribed to " + count + " topics." );
  111. });
  112. } else {
  113. this.cursorSubscriber = new RedisConnection(6379, process.env.REDIS_PORT_6379_TCP_ADDR || config.get("redis_host"));
  114. this.cursorSubscriber.subscribe(['cursors', 'users', 'updates'], function (err, count) {
  115. console.log("[redis] websockets subscribed to " + count + " topics." );
  116. });
  117. }
  118. this.cursorSubscriber.on('message', function (channel, rawMessage) {
  119. const msg = JSON.parse(rawMessage);
  120. const spaceId = msg.space_id;
  121. const websockets = this.current_websockets;
  122. if(channel === "updates") {
  123. for(let i=0;i<websockets.length;i++) {
  124. const ws = websockets[i];
  125. if(ws.readyState === 1) {
  126. ws.send(JSON.stringify(msg));
  127. }
  128. }
  129. } else if(channel === "users") {
  130. const usersList = msg.users;
  131. if (usersList) {
  132. for(let i=0;i<usersList.length;i++) {
  133. const activeUser = usersList[i];
  134. let user_id;
  135. if (activeUser._id) {
  136. user_id = activeUser._id;
  137. } else {
  138. user_id = spaceId + "-" + (activeUser.nickname||"anonymous");
  139. }
  140. for (let a=0; a < websockets.length; a++) {
  141. const ws = websockets[a];
  142. if(ws.readyState === 1){
  143. if(ws.space_id == spaceId) {
  144. ws.send(JSON.stringify({"action": "status_update", space_id: spaceId, users: usersList}));
  145. } else {
  146. //console.log("space id not matching", spaceId, ws.space_id);
  147. }
  148. } else {
  149. // FIXME SHOULD CLEANUP SOCKET HERE
  150. console.error("socket in wrong state", ws.readyState);
  151. if(ws.readyState == 3) {
  152. this.removeLocalUser(ws, (err) => {
  153. console.log("old websocket removed");
  154. });
  155. }
  156. }
  157. }
  158. }
  159. } else {
  160. console.error("userlist undefined for websocket");
  161. }
  162. } else if(channel === "cursors") {
  163. const socketId = msg.from_socket_id;
  164. for (let i=0;i<websockets.length;i++) {
  165. const ws = websockets[i];
  166. if (ws.readyState === 1) {
  167. if (ws.space_id && spaceId) {
  168. if ((ws.space_id == spaceId) && (ws.id !== socketId)) {
  169. ws.send(JSON.stringify(msg));
  170. }
  171. } else {
  172. console.log("space id not set, ignoring");
  173. }
  174. }
  175. }
  176. }
  177. }.bind(this));
  178. },
  179. addLocalUser: function(username, ws) {
  180. if (ws.added) {
  181. return;
  182. }
  183. ws.added = true;
  184. this.current_websockets.push(ws);
  185. },
  186. removeLocalUser: function(ws, cb) {
  187. const idx = this.current_websockets.indexOf(ws);
  188. if(idx > -1) {
  189. this.removed_items = this.current_websockets.splice(idx, 1);
  190. console.log("removed local socket, current online on this process: ", this.current_websockets.length);
  191. } else {
  192. console.log("websocket not found to remove");
  193. }
  194. this.state.del(ws.id+"", function(err, res) {
  195. if (err) console.error(err, res);
  196. else {
  197. this.removeUserInSpace(ws.space_id, ws, (err) => {
  198. console.log("removed user from space list");
  199. this.distributeUsers(ws.space_id);
  200. })
  201. if(cb)
  202. cb(err);
  203. }
  204. }.bind(this));
  205. },
  206. addUserInSpace: function(username, space, ws, cb) {
  207. console.log("[websockets] user "+username+" in "+space.access_mode +" space " + space._id + " with socket " + ws.id);
  208. this.state.set(ws.id+"", username+"", function(err, res) {
  209. if(err) console.error(err, res);
  210. else {
  211. this.state.sadd("space_" + space._id, ws.id, function(err, res) {
  212. if(err) cb(err);
  213. else {
  214. ws['space_id'] = space._id.toString();
  215. this.distributeUsers(ws.space_id);
  216. if(cb)
  217. cb();
  218. }
  219. }.bind(this));
  220. }
  221. }.bind(this));
  222. },
  223. removeUserInSpace: function(spaceId, ws, cb) {
  224. this.state.srem("space_" + spaceId, ws.id+"", function(err, res) {
  225. if (err) cb(err);
  226. else {
  227. console.log("[websockets] socket "+ ws.id + " went offline in space " + spaceId);
  228. this.distributeUsers(spaceId);
  229. ws['space_id'] = null;
  230. if (cb)
  231. cb();
  232. }
  233. }.bind(this));
  234. },
  235. distributeUsers: function(spaceId) {
  236. if (!spaceId)
  237. return;
  238. /*this.state.smembers("space_" + spaceId, function(err, list) {
  239. async.map(list, function(item, callback) {
  240. this.state.get(item, function(err, userId) {
  241. console.log(item, "->", userId);
  242. callback(null, userId);
  243. });
  244. }.bind(this), function(err, userIds) {
  245. const uniqueUserIds = _.unique(userIds);
  246. const validUserIds = _.filter(uniqueUserIds, function(uId) {
  247. return mongoose.Types.ObjectId.isValid(uId);
  248. });
  249. const nonValidUserIds = _.filter(uniqueUserIds, function(uId) {
  250. return (uId !== null && !mongoose.Types.ObjectId.isValid(uId));
  251. });
  252. const anonymousUsers = _.map(nonValidUserIds, function(nonValidId) {
  253. const realNickname = nonValidId.slice(nonValidId.indexOf("-")+1);
  254. return {nickname: realNickname, email: null, avatar_thumbnail_uri: null };
  255. });
  256. db.User.findAll({where: {
  257. "_id" : { "$in" : validUserIds }}, attributes: ["nickname","email","avatar_thumbnail_uri"]})
  258. .then(users) {
  259. const allUsers = users.concat(anonymousUsers);
  260. const strUsers = JSON.stringify({users: allUsers, space_id: spaceId});
  261. this.state.publish("users", strUsers);
  262. }.bind(this));
  263. }.bind(this));
  264. }.bind(this));*/
  265. }
  266. };