package main import ( "encoding/json" "fmt" "github.com/gorilla/websocket" "github.com/labstack/echo" ) var ( upgrader = websocket.Upgrader{ReadBufferSize: 1024, WriteBufferSize: 1024} hub *wsHub ) const ( wsErrorCMD = iota wsFileInfoCMD wsFilePatchCMD wsSubscribeCMD wsUnsubscribeCMD wsWritersCMD // Update on file wsFileUpdateCMD // When new file flushed wsFileCommit ) func init() { //upgrader.CheckOrigin = func(r *http.Request) bool { // return true //} hub = new(wsHub) hub.clients = make(map[*wsCli]bool) hub.register = make(chan *wsCli) hub.unregister = make(chan *wsCli) hub.broadcast = make(chan *wsMessage) } type wsMessage struct { cli *wsCli hash uint64 CMD int `json:"cmd"` Seq int `json:"seq"` Data interface{} `json:"data"` } type wsFilePatch struct { Name string `json:"name"` Patch string `json:"patch"` } type wsCli struct { hub *wsHub conn *websocket.Conn buffer chan []byte fileOpen uint64 } func (c *wsCli) reader() { defer func() { c.hub.unregister <- c c.conn.Close() }() for { var msg wsMessage _, raw, err := c.conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { e.Logger.Errorf("error: %v", err) } break } err = json.Unmarshal(raw, &msg) if err != nil { return } msg.cli = c switch msg.CMD { case wsSubscribeCMD: fname, ok := msg.Data.(string) if !ok { goto respEmpty } msg.hash, fname = calcHash(fname) var cached *fileCache if finfo, ok := fileTree[msg.hash]; ok { if cached, ok = fileCaches[msg.hash]; !ok { cached = newFileCache(finfo) fileCaches[msg.hash] = cached } } else { cached = newFileCache(nil) cached.Hash = msg.hash cached.Path = fname fileCaches[msg.hash] = cached } if c.fileOpen != msg.hash { if c.fileOpen > 0 { if cached2, ok := fileCaches[c.fileOpen]; ok { cached2.RemoveEditor(c) } } c.fileOpen = msg.hash cached.AddEditor(c) } msg.CMD = wsWritersCMD msg.Data = cached.GetUpdate() goto respMessage case wsUnsubscribeCMD: if c.fileOpen == 0 { goto respEmpty } if cached, ok := fileCaches[c.fileOpen]; ok { cached.RemoveEditor(c) //brdMsg := wsMessage{ // cli: c, // hash: 0, // CMD: wsFileUpdateCMD, // Data: cached.Name(), //} //hub.broadcast<-&brdMsg } c.fileOpen = 0 case wsFileInfoCMD: //fname, ok := msg.Data.(string) //if !ok { goto respEmpty } //hash, _ := calcHash(fname) //if finfo, ok := fileTree[hash]; ok { // c.fileOpen = hash // if _, ok = fileCaches[hash]; !ok { // fileCaches[hash] = newFileCache(finfo) // } // msg.Data = finfo // goto respMessage //} case wsFilePatchCMD: msgStruct := struct { Data *wsFilePatch `json:"data"` }{} if err = json.Unmarshal(raw, &msgStruct); err != nil || msgStruct.Data == nil { goto respEmpty } fpatch := msgStruct.Data var fpath string msg.hash, fpath = calcHash(fpatch.Name) var cached *fileCache var ok bool if cached, ok = fileCaches[msg.hash]; !ok { finfo, _ := fileTree[msg.hash] cached = newFileCache(finfo) cached.Hash = msg.hash cached.Path = fpath fileCaches[msg.hash] = cached } if c.fileOpen != msg.hash { if c.fileOpen > 0 { if cached2, ok := fileCaches[c.fileOpen]; ok { cached2.RemoveEditor(c) } } c.fileOpen = msg.hash cached.AddEditor(c) } patches, err := dmp.PatchFromText(fpatch.Patch) if err != nil { msg.Data = err.Error() msg.CMD = wsErrorCMD goto respMessage } cached.File, _ = dmp.PatchApply(patches, cached.File) hub.broadcast<-&msg default: fmt.Printf("Unknown command #%d\n", msg.CMD) } respEmpty: if msg.Seq > 0 { msg.Data = nil goto respMessage } continue respMessage: err = c.conn.WriteJSON(msg) if err != nil { return } } } //func (c *wsCli) writer() { // defer func() { // c.conn.Close() // }() // for { // select { // case message, ok := <- c.buffer: // if !ok { // _ = c.conn.WriteMessage(websocket.CloseMessage, []byte{}) // return // } // err := c.conn.WriteMessage(websocket.TextMessage, message) // if err != nil { return } // } // } //} type wsHub struct { clients map[*wsCli]bool register chan *wsCli unregister chan *wsCli broadcast chan *wsMessage } func (h *wsHub) run() { for { select { case client := <-h.register: fmt.Println("Connection opened: " + client.conn.RemoteAddr().String()) h.clients[client] = true case client := <-h.unregister: if _, ok := h.clients[client]; ok { fmt.Println("Connection closed: " + client.conn.RemoteAddr().String()) if client.fileOpen != 0 { if cached, ok := fileCaches[client.fileOpen]; ok { cached.RemoveEditor(client) } } delete(h.clients, client) close(client.buffer) } case message := <-h.broadcast: payload, err := json.Marshal(message) if err != nil { e.Logger.Error("Failed to marshal patch", err) continue } for client := range h.clients { if message.cli != nil && client == message.cli { continue } if client.fileOpen != message.hash { continue } err = client.conn.WriteMessage(websocket.TextMessage, payload) if err != nil { close(client.buffer) delete(h.clients, client) } //select { //case client.buffer <- message: //default: // close(client.buffer) // delete(h.clients, client) //} } } } } func handleWS(c echo.Context) error { ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil) if err != nil { return err } client := &wsCli{hub: hub, conn: ws, buffer: make(chan []byte, 256)} client.hub.register <- client go client.reader() //go client.writer() return nil }