diff options
Diffstat (limited to 'plugin/rpcplugin/muxer.go')
-rw-r--r-- | plugin/rpcplugin/muxer.go | 41 |
1 files changed, 25 insertions, 16 deletions
diff --git a/plugin/rpcplugin/muxer.go b/plugin/rpcplugin/muxer.go index a2bfbf8b6..393a122c4 100644 --- a/plugin/rpcplugin/muxer.go +++ b/plugin/rpcplugin/muxer.go @@ -114,20 +114,22 @@ func (m *Muxer) write(p []byte, sid int64) (int, error) { if m.IsClosed() { return 0, fmt.Errorf("muxer closed") } - buf := make([]byte, 10) - n := binary.PutVarint(buf, sid) + var buf [10]byte + n := binary.PutVarint(buf[:], sid) if _, err := m.conn.Write(buf[:n]); err != nil { m.shutdown(err) return 0, err } - n = binary.PutVarint(buf, int64(len(p))) + n = binary.PutVarint(buf[:], int64(len(p))) if _, err := m.conn.Write(buf[:n]); err != nil { m.shutdown(err) return 0, err } - if _, err := m.conn.Write(p); err != nil { - m.shutdown(err) - return 0, err + if len(p) > 0 { + if _, err := m.conn.Write(p); err != nil { + m.shutdown(err) + return 0, err + } } return len(p), nil } @@ -180,7 +182,11 @@ func (m *Muxer) loop() error { } continue } - _, err = io.CopyN(&stream.readBuf, reader, len) + if len == 0 { + stream.remoteClosed = true + } else { + _, err = io.CopyN(&stream.readBuf, reader, len) + } stream.mutex.Unlock() if err != nil { return err @@ -207,13 +213,14 @@ func (m *Muxer) shutdown(err error) { } type muxerStream struct { - id int64 - muxer *Muxer - readBuf bytes.Buffer - mutex *sync.Mutex - readWake *sync.Cond - isClosed bool - closeErr error + id int64 + muxer *Muxer + readBuf bytes.Buffer + mutex *sync.Mutex + readWake *sync.Cond + isClosed bool + remoteClosed bool + closeErr error } func (s *muxerStream) Read(p []byte) (int, error) { @@ -225,8 +232,9 @@ func (s *muxerStream) Read(p []byte) (int, error) { } else if s.isClosed { return 0, io.EOF } else if s.readBuf.Len() > 0 { - n, err := s.readBuf.Read(p) - return n, err + return s.readBuf.Read(p) + } else if s.remoteClosed { + return 0, io.EOF } s.readWake.Wait() } @@ -245,6 +253,7 @@ func (s *muxerStream) Close() error { s.mutex.Lock() defer s.mutex.Unlock() if !s.isClosed { + s.muxer.write(nil, s.id) s.isClosed = true s.muxer.rm(s.id) } |