ws.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/gorilla/websocket"
  6. "github.com/labstack/echo"
  7. )
  8. var (
  9. upgrader = websocket.Upgrader{ReadBufferSize: 1024, WriteBufferSize: 1024}
  10. hub *wsHub
  11. )
  12. const (
  13. wsErrorCMD = iota
  14. wsFileInfoCMD
  15. wsFilePatchCMD
  16. wsSubscribeCMD
  17. wsUnsubscribeCMD
  18. wsWritersCMD // Update on file
  19. wsFileUpdateCMD // When new file flushed
  20. wsFileCommit
  21. )
  22. func init() {
  23. hub = new(wsHub)
  24. hub.clients = make(map[*wsCli]bool)
  25. hub.register = make(chan *wsCli)
  26. hub.unregister = make(chan *wsCli)
  27. hub.broadcast = make(chan *wsMessage)
  28. }
  29. type wsMessage struct {
  30. cli *wsCli
  31. hash uint64
  32. CMD int `json:"cmd"`
  33. Seq int `json:"seq"`
  34. Data interface{} `json:"data"`
  35. }
  36. type wsFilePatch struct {
  37. Name string `json:"name"`
  38. Patch string `json:"patch"`
  39. }
  40. type wsCli struct {
  41. hub *wsHub
  42. conn *websocket.Conn
  43. buffer chan []byte
  44. fileOpen uint64
  45. }
  46. func (c *wsCli) reader() {
  47. defer func() {
  48. c.hub.unregister <- c
  49. c.conn.Close()
  50. }()
  51. for {
  52. var msg wsMessage
  53. _, raw, err := c.conn.ReadMessage()
  54. if err != nil {
  55. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  56. e.Logger.Errorf("error: %v", err)
  57. }
  58. break
  59. }
  60. err = json.Unmarshal(raw, &msg)
  61. if err != nil {
  62. return
  63. }
  64. msg.cli = c
  65. switch msg.CMD {
  66. case wsSubscribeCMD:
  67. fname, ok := msg.Data.(string)
  68. if !ok { goto respEmpty }
  69. msg.hash, fname = calcHash(fname)
  70. var cached *fileCache
  71. if finfo, ok := fileTree[msg.hash]; ok {
  72. if cached, ok = fileCaches[msg.hash]; !ok {
  73. cached = newFileCache(finfo)
  74. fileCaches[msg.hash] = cached
  75. }
  76. } else {
  77. cached = newFileCache(nil)
  78. cached.Hash = msg.hash
  79. cached.Path = fname
  80. fileCaches[msg.hash] = cached
  81. }
  82. if c.fileOpen != msg.hash {
  83. if c.fileOpen > 0 {
  84. if cached2, ok := fileCaches[c.fileOpen]; ok {
  85. cached2.RemoveEditor(c)
  86. }
  87. }
  88. c.fileOpen = msg.hash
  89. cached.AddEditor(c)
  90. }
  91. msg.CMD = wsWritersCMD
  92. msg.Data = cached.GetUpdate()
  93. goto respMessage
  94. case wsUnsubscribeCMD:
  95. if c.fileOpen == 0 {
  96. goto respEmpty
  97. }
  98. if cached, ok := fileCaches[c.fileOpen]; ok {
  99. cached.RemoveEditor(c)
  100. //brdMsg := wsMessage{
  101. // cli: c,
  102. // hash: 0,
  103. // CMD: wsFileUpdateCMD,
  104. // Data: cached.Name(),
  105. //}
  106. //hub.broadcast<-&brdMsg
  107. }
  108. c.fileOpen = 0
  109. case wsFileInfoCMD:
  110. //fname, ok := msg.Data.(string)
  111. //if !ok { goto respEmpty }
  112. //hash, _ := calcHash(fname)
  113. //if finfo, ok := fileTree[hash]; ok {
  114. // c.fileOpen = hash
  115. // if _, ok = fileCaches[hash]; !ok {
  116. // fileCaches[hash] = newFileCache(finfo)
  117. // }
  118. // msg.Data = finfo
  119. // goto respMessage
  120. //}
  121. case wsFilePatchCMD:
  122. msgStruct := struct {
  123. Data *wsFilePatch `json:"data"`
  124. }{}
  125. if err = json.Unmarshal(raw, &msgStruct); err != nil || msgStruct.Data == nil {
  126. goto respEmpty
  127. }
  128. fpatch := msgStruct.Data
  129. var fpath string
  130. msg.hash, fpath = calcHash(fpatch.Name)
  131. var cached *fileCache
  132. var ok bool
  133. if cached, ok = fileCaches[msg.hash]; !ok {
  134. finfo, _ := fileTree[msg.hash]
  135. cached = newFileCache(finfo)
  136. cached.Hash = msg.hash
  137. cached.Path = fpath
  138. fileCaches[msg.hash] = cached
  139. }
  140. if c.fileOpen != msg.hash {
  141. if c.fileOpen > 0 {
  142. if cached2, ok := fileCaches[c.fileOpen]; ok {
  143. cached2.RemoveEditor(c)
  144. }
  145. }
  146. c.fileOpen = msg.hash
  147. cached.AddEditor(c)
  148. }
  149. patches, err := dmp.PatchFromText(fpatch.Patch)
  150. if err != nil {
  151. msg.Data = err.Error()
  152. msg.CMD = wsErrorCMD
  153. goto respMessage
  154. }
  155. cached.File, _ = dmp.PatchApply(patches, cached.File)
  156. hub.broadcast<-&msg
  157. default:
  158. fmt.Printf("Unknown command #%d\n", msg.CMD)
  159. }
  160. respEmpty:
  161. if msg.Seq > 0 {
  162. msg.Data = nil
  163. goto respMessage
  164. }
  165. continue
  166. respMessage:
  167. err = c.conn.WriteJSON(msg)
  168. if err != nil { return }
  169. }
  170. }
  171. //func (c *wsCli) writer() {
  172. // defer func() {
  173. // c.conn.Close()
  174. // }()
  175. // for {
  176. // select {
  177. // case message, ok := <- c.buffer:
  178. // if !ok {
  179. // _ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
  180. // return
  181. // }
  182. // err := c.conn.WriteMessage(websocket.TextMessage, message)
  183. // if err != nil { return }
  184. // }
  185. // }
  186. //}
  187. type wsHub struct {
  188. clients map[*wsCli]bool
  189. register chan *wsCli
  190. unregister chan *wsCli
  191. broadcast chan *wsMessage
  192. }
  193. func (h *wsHub) run() {
  194. for {
  195. select {
  196. case client := <-h.register:
  197. fmt.Println("Connection opened: " + client.conn.RemoteAddr().String())
  198. h.clients[client] = true
  199. case client := <-h.unregister:
  200. if _, ok := h.clients[client]; ok {
  201. fmt.Println("Connection closed: " + client.conn.RemoteAddr().String())
  202. if client.fileOpen != 0 {
  203. if cached, ok := fileCaches[client.fileOpen]; ok {
  204. cached.RemoveEditor(client)
  205. }
  206. }
  207. delete(h.clients, client)
  208. close(client.buffer)
  209. }
  210. case message := <-h.broadcast:
  211. payload, err := json.Marshal(message)
  212. if err != nil {
  213. e.Logger.Error("Failed to marshal patch", err)
  214. continue
  215. }
  216. for client := range h.clients {
  217. if message.cli != nil && client == message.cli { continue }
  218. if client.fileOpen != message.hash { continue }
  219. err = client.conn.WriteMessage(websocket.TextMessage, payload)
  220. if err != nil {
  221. close(client.buffer)
  222. delete(h.clients, client)
  223. }
  224. //select {
  225. //case client.buffer <- message:
  226. //default:
  227. // close(client.buffer)
  228. // delete(h.clients, client)
  229. //}
  230. }
  231. }
  232. }
  233. }
  234. func handleWS(c echo.Context) error {
  235. ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
  236. if err != nil {
  237. return err
  238. }
  239. client := &wsCli{hub: hub, conn: ws, buffer: make(chan []byte, 256)}
  240. client.hub.register <- client
  241. go client.reader()
  242. //go client.writer()
  243. return nil
  244. }