||
- package main
- import (
- "encoding/json"
- "fmt"
- "github.com/gorilla/websocket"
- "github.com/labstack/echo"
- )
- var (
- upgrader = websocket.Upgrader{ReadBufferSize: 1024, WriteBufferSize: 1024}
- hub *wsHub
- )
- const (
- wsErrorCMD = iota
- wsFileInfoCMD
- wsFilePatchCMD
- wsSubscribeCMD
- wsUnsubscribeCMD
- wsWritersCMD // Update on file
- wsFileUpdateCMD // When new file flushed
- wsFileCommit
- )
- func init() {
- //upgrader.CheckOrigin = func(r *http.Request) bool {
- // return true
- //}
- hub = new(wsHub)
- hub.clients = make(map[*wsCli]bool)
- hub.register = make(chan *wsCli)
- hub.unregister = make(chan *wsCli)
- hub.broadcast = make(chan *wsMessage)
- }
- type wsMessage struct {
- cli *wsCli
- hash uint64
- CMD int `json:"cmd"`
- Seq int `json:"seq"`
- Data interface{} `json:"data"`
- }
- type wsFilePatch struct {
- Name string `json:"name"`
- Patch string `json:"patch"`
- }
- type wsCli struct {
- hub *wsHub
- conn *websocket.Conn
- buffer chan []byte
- fileOpen uint64
- }
- func (c *wsCli) reader() {
- defer func() {
- c.hub.unregister <- c
- c.conn.Close()
- }()
- for {
- var msg wsMessage
- _, raw, err := c.conn.ReadMessage()
- if err != nil {
- if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
- e.Logger.Errorf("error: %v", err)
- }
- break
- }
- err = json.Unmarshal(raw, &msg)
- if err != nil {
- return
- }
- msg.cli = c
- switch msg.CMD {
- case wsSubscribeCMD:
- fname, ok := msg.Data.(string)
- if !ok { goto respEmpty }
- msg.hash, fname = calcHash(fname)
- var cached *fileCache
- if finfo, ok := fileTree[msg.hash]; ok {
- if cached, ok = fileCaches[msg.hash]; !ok {
- cached = newFileCache(finfo)
- fileCaches[msg.hash] = cached
- }
- } else {
- cached = newFileCache(nil)
- cached.Hash = msg.hash
- cached.Path = fname
- fileCaches[msg.hash] = cached
- }
- if c.fileOpen != msg.hash {
- if c.fileOpen > 0 {
- if cached2, ok := fileCaches[c.fileOpen]; ok {
- cached2.RemoveEditor(c)
- }
- }
- c.fileOpen = msg.hash
- cached.AddEditor(c)
- }
- msg.CMD = wsWritersCMD
- msg.Data = cached.GetUpdate()
- goto respMessage
- case wsUnsubscribeCMD:
- if c.fileOpen == 0 {
- goto respEmpty
- }
- if cached, ok := fileCaches[c.fileOpen]; ok {
- cached.RemoveEditor(c)
- //brdMsg := wsMessage{
- // cli: c,
- // hash: 0,
- // CMD: wsFileUpdateCMD,
- // Data: cached.Name(),
- //}
- //hub.broadcast<-&brdMsg
- }
- c.fileOpen = 0
- case wsFileInfoCMD:
- //fname, ok := msg.Data.(string)
- //if !ok { goto respEmpty }
- //hash, _ := calcHash(fname)
- //if finfo, ok := fileTree[hash]; ok {
- // c.fileOpen = hash
- // if _, ok = fileCaches[hash]; !ok {
- // fileCaches[hash] = newFileCache(finfo)
- // }
- // msg.Data = finfo
- // goto respMessage
- //}
- case wsFilePatchCMD:
- msgStruct := struct {
- Data *wsFilePatch `json:"data"`
- }{}
- if err = json.Unmarshal(raw, &msgStruct); err != nil || msgStruct.Data == nil {
- goto respEmpty
- }
- fpatch := msgStruct.Data
- var fpath string
- msg.hash, fpath = calcHash(fpatch.Name)
- var cached *fileCache
- var ok bool
- if cached, ok = fileCaches[msg.hash]; !ok {
- finfo, _ := fileTree[msg.hash]
- cached = newFileCache(finfo)
- cached.Hash = msg.hash
- cached.Path = fpath
- fileCaches[msg.hash] = cached
- }
- if c.fileOpen != msg.hash {
- if c.fileOpen > 0 {
- if cached2, ok := fileCaches[c.fileOpen]; ok {
- cached2.RemoveEditor(c)
- }
- }
- c.fileOpen = msg.hash
- cached.AddEditor(c)
- }
- patches, err := dmp.PatchFromText(fpatch.Patch)
- if err != nil {
- msg.Data = err.Error()
- msg.CMD = wsErrorCMD
- goto respMessage
- }
- cached.File, _ = dmp.PatchApply(patches, cached.File)
- hub.broadcast<-&msg
- default:
- fmt.Printf("Unknown command #%d\n", msg.CMD)
- }
- respEmpty:
- if msg.Seq > 0 {
- msg.Data = nil
- goto respMessage
- }
- continue
- respMessage:
- err = c.conn.WriteJSON(msg)
- if err != nil { return }
- }
- }
- //func (c *wsCli) writer() {
- // defer func() {
- // c.conn.Close()
- // }()
- // for {
- // select {
- // case message, ok := <- c.buffer:
- // if !ok {
- // _ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
- // return
- // }
- // err := c.conn.WriteMessage(websocket.TextMessage, message)
- // if err != nil { return }
- // }
- // }
- //}
- type wsHub struct {
- clients map[*wsCli]bool
- register chan *wsCli
- unregister chan *wsCli
- broadcast chan *wsMessage
- }
- func (h *wsHub) run() {
- for {
- select {
- case client := <-h.register:
- fmt.Println("Connection opened: " + client.conn.RemoteAddr().String())
- h.clients[client] = true
- case client := <-h.unregister:
- if _, ok := h.clients[client]; ok {
- fmt.Println("Connection closed: " + client.conn.RemoteAddr().String())
- if client.fileOpen != 0 {
- if cached, ok := fileCaches[client.fileOpen]; ok {
- cached.RemoveEditor(client)
- }
- }
- delete(h.clients, client)
- close(client.buffer)
- }
- case message := <-h.broadcast:
- payload, err := json.Marshal(message)
- if err != nil {
- e.Logger.Error("Failed to marshal patch", err)
- continue
- }
- for client := range h.clients {
- if message.cli != nil && client == message.cli { continue }
- if client.fileOpen != message.hash { continue }
- err = client.conn.WriteMessage(websocket.TextMessage, payload)
- if err != nil {
- close(client.buffer)
- delete(h.clients, client)
- }
- //select {
- //case client.buffer <- message:
- //default:
- // close(client.buffer)
- // delete(h.clients, client)
- //}
- }
- }
- }
- }
- func handleWS(c echo.Context) error {
- ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
- if err != nil {
- return err
- }
- client := &wsCli{hub: hub, conn: ws, buffer: make(chan []byte, 256)}
- client.hub.register <- client
- go client.reader()
- //go client.writer()
- return nil
- }
|