ws.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/gorilla/websocket"
  6. "github.com/labstack/echo"
  7. )
  8. var (
  9. upgrader = websocket.Upgrader{ReadBufferSize: 1024, WriteBufferSize: 1024}
  10. hub *wsHub
  11. )
  12. const (
  13. wsErrorCMD = iota
  14. wsFileInfoCMD
  15. wsFilePatchCMD
  16. wsSubscribeCMD
  17. wsUnsubscribeCMD
  18. wsWritersCMD // Update on file
  19. wsFileUpdateCMD // When new file flushed
  20. wsFileCommit
  21. )
  22. func init() {
  23. //upgrader.CheckOrigin = func(r *http.Request) bool {
  24. // return true
  25. //}
  26. hub = new(wsHub)
  27. hub.clients = make(map[*wsCli]bool)
  28. hub.register = make(chan *wsCli)
  29. hub.unregister = make(chan *wsCli)
  30. hub.broadcast = make(chan *wsMessage)
  31. }
  32. type wsMessage struct {
  33. cli *wsCli
  34. hash uint64
  35. CMD int `json:"cmd"`
  36. Seq int `json:"seq"`
  37. Data interface{} `json:"data"`
  38. }
  39. type wsFilePatch struct {
  40. Name string `json:"name"`
  41. Patch string `json:"patch"`
  42. }
  43. type wsCli struct {
  44. hub *wsHub
  45. conn *websocket.Conn
  46. buffer chan []byte
  47. fileOpen uint64
  48. }
  49. func (c *wsCli) reader() {
  50. defer func() {
  51. c.hub.unregister <- c
  52. c.conn.Close()
  53. }()
  54. for {
  55. var msg wsMessage
  56. _, raw, err := c.conn.ReadMessage()
  57. if err != nil {
  58. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  59. e.Logger.Errorf("error: %v", err)
  60. }
  61. break
  62. }
  63. err = json.Unmarshal(raw, &msg)
  64. if err != nil {
  65. return
  66. }
  67. msg.cli = c
  68. switch msg.CMD {
  69. case wsSubscribeCMD:
  70. fname, ok := msg.Data.(string)
  71. if !ok { goto respEmpty }
  72. msg.hash, fname = calcHash(fname)
  73. var cached *fileCache
  74. if finfo, ok := fileTree[msg.hash]; ok {
  75. if cached, ok = fileCaches[msg.hash]; !ok {
  76. cached = newFileCache(finfo)
  77. fileCaches[msg.hash] = cached
  78. }
  79. } else {
  80. cached = newFileCache(nil)
  81. cached.Hash = msg.hash
  82. cached.Path = fname
  83. fileCaches[msg.hash] = cached
  84. }
  85. if c.fileOpen != msg.hash {
  86. if c.fileOpen > 0 {
  87. if cached2, ok := fileCaches[c.fileOpen]; ok {
  88. cached2.RemoveEditor(c)
  89. }
  90. }
  91. c.fileOpen = msg.hash
  92. cached.AddEditor(c)
  93. }
  94. msg.CMD = wsWritersCMD
  95. msg.Data = cached.GetUpdate()
  96. goto respMessage
  97. case wsUnsubscribeCMD:
  98. if c.fileOpen == 0 {
  99. goto respEmpty
  100. }
  101. if cached, ok := fileCaches[c.fileOpen]; ok {
  102. cached.RemoveEditor(c)
  103. //brdMsg := wsMessage{
  104. // cli: c,
  105. // hash: 0,
  106. // CMD: wsFileUpdateCMD,
  107. // Data: cached.Name(),
  108. //}
  109. //hub.broadcast<-&brdMsg
  110. }
  111. c.fileOpen = 0
  112. case wsFileInfoCMD:
  113. //fname, ok := msg.Data.(string)
  114. //if !ok { goto respEmpty }
  115. //hash, _ := calcHash(fname)
  116. //if finfo, ok := fileTree[hash]; ok {
  117. // c.fileOpen = hash
  118. // if _, ok = fileCaches[hash]; !ok {
  119. // fileCaches[hash] = newFileCache(finfo)
  120. // }
  121. // msg.Data = finfo
  122. // goto respMessage
  123. //}
  124. case wsFilePatchCMD:
  125. msgStruct := struct {
  126. Data *wsFilePatch `json:"data"`
  127. }{}
  128. if err = json.Unmarshal(raw, &msgStruct); err != nil || msgStruct.Data == nil {
  129. goto respEmpty
  130. }
  131. fpatch := msgStruct.Data
  132. var fpath string
  133. msg.hash, fpath = calcHash(fpatch.Name)
  134. var cached *fileCache
  135. var ok bool
  136. if cached, ok = fileCaches[msg.hash]; !ok {
  137. finfo, _ := fileTree[msg.hash]
  138. cached = newFileCache(finfo)
  139. cached.Hash = msg.hash
  140. cached.Path = fpath
  141. fileCaches[msg.hash] = cached
  142. }
  143. if c.fileOpen != msg.hash {
  144. if c.fileOpen > 0 {
  145. if cached2, ok := fileCaches[c.fileOpen]; ok {
  146. cached2.RemoveEditor(c)
  147. }
  148. }
  149. c.fileOpen = msg.hash
  150. cached.AddEditor(c)
  151. }
  152. patches, err := dmp.PatchFromText(fpatch.Patch)
  153. if err != nil {
  154. msg.Data = err.Error()
  155. msg.CMD = wsErrorCMD
  156. goto respMessage
  157. }
  158. cached.File, _ = dmp.PatchApply(patches, cached.File)
  159. hub.broadcast<-&msg
  160. default:
  161. fmt.Printf("Unknown command #%d\n", msg.CMD)
  162. }
  163. respEmpty:
  164. if msg.Seq > 0 {
  165. msg.Data = nil
  166. goto respMessage
  167. }
  168. continue
  169. respMessage:
  170. err = c.conn.WriteJSON(msg)
  171. if err != nil { return }
  172. }
  173. }
  174. //func (c *wsCli) writer() {
  175. // defer func() {
  176. // c.conn.Close()
  177. // }()
  178. // for {
  179. // select {
  180. // case message, ok := <- c.buffer:
  181. // if !ok {
  182. // _ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
  183. // return
  184. // }
  185. // err := c.conn.WriteMessage(websocket.TextMessage, message)
  186. // if err != nil { return }
  187. // }
  188. // }
  189. //}
  190. type wsHub struct {
  191. clients map[*wsCli]bool
  192. register chan *wsCli
  193. unregister chan *wsCli
  194. broadcast chan *wsMessage
  195. }
  196. func (h *wsHub) run() {
  197. for {
  198. select {
  199. case client := <-h.register:
  200. fmt.Println("Connection opened: " + client.conn.RemoteAddr().String())
  201. h.clients[client] = true
  202. case client := <-h.unregister:
  203. if _, ok := h.clients[client]; ok {
  204. fmt.Println("Connection closed: " + client.conn.RemoteAddr().String())
  205. if client.fileOpen != 0 {
  206. if cached, ok := fileCaches[client.fileOpen]; ok {
  207. cached.RemoveEditor(client)
  208. }
  209. }
  210. delete(h.clients, client)
  211. close(client.buffer)
  212. }
  213. case message := <-h.broadcast:
  214. payload, err := json.Marshal(message)
  215. if err != nil {
  216. e.Logger.Error("Failed to marshal patch", err)
  217. continue
  218. }
  219. for client := range h.clients {
  220. if message.cli != nil && client == message.cli { continue }
  221. if client.fileOpen != message.hash { continue }
  222. err = client.conn.WriteMessage(websocket.TextMessage, payload)
  223. if err != nil {
  224. close(client.buffer)
  225. delete(h.clients, client)
  226. }
  227. //select {
  228. //case client.buffer <- message:
  229. //default:
  230. // close(client.buffer)
  231. // delete(h.clients, client)
  232. //}
  233. }
  234. }
  235. }
  236. }
  237. func handleWS(c echo.Context) error {
  238. ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
  239. if err != nil {
  240. return err
  241. }
  242. client := &wsCli{hub: hub, conn: ws, buffer: make(chan []byte, 256)}
  243. client.hub.register <- client
  244. go client.reader()
  245. //go client.writer()
  246. return nil
  247. }