diff options
author | Chris <ccbrown112@gmail.com> | 2017-08-16 17:23:38 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-08-16 17:23:38 -0500 |
commit | f80d50adbddf55a043dfcab5b47d7c1e22749b7d (patch) | |
tree | 5deb606debb6322716c9cdcc6c58be4f68b74223 /plugin/rpcplugin/muxer_test.go | |
parent | 4f85ed985d478ddf6692fa4f7d8d98d2a412d18c (diff) | |
download | chat-f80d50adbddf55a043dfcab5b47d7c1e22749b7d.tar.gz chat-f80d50adbddf55a043dfcab5b47d7c1e22749b7d.tar.bz2 chat-f80d50adbddf55a043dfcab5b47d7c1e22749b7d.zip |
PLT-7407: Back-end plugin mechanism (#7177)
* begin backend plugin wip
* flesh out rpcplugin. everything done except for minor supervisor stubs
* done with basic plugin infrastructure
* simplify tests
* remove unused test lines
Diffstat (limited to 'plugin/rpcplugin/muxer_test.go')
-rw-r--r-- | plugin/rpcplugin/muxer_test.go | 169 |
1 files changed, 169 insertions, 0 deletions
diff --git a/plugin/rpcplugin/muxer_test.go b/plugin/rpcplugin/muxer_test.go new file mode 100644 index 000000000..7bb63d4f8 --- /dev/null +++ b/plugin/rpcplugin/muxer_test.go @@ -0,0 +1,169 @@ +package rpcplugin + +import ( + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMuxer(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + defer func() { assert.NoError(t, alice.Close()) }() + + bob := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer func() { assert.NoError(t, bob.Close()) }() + + id1, alice1 := alice.Serve() + defer func() { assert.NoError(t, alice1.Close()) }() + + id2, bob2 := bob.Serve() + defer func() { assert.NoError(t, bob2.Close()) }() + + done1 := make(chan bool) + done2 := make(chan bool) + + go func() { + bob1 := bob.Connect(id1) + defer func() { assert.NoError(t, bob1.Close()) }() + + n, err := bob1.Write([]byte("ping1.0")) + require.NoError(t, err) + assert.Equal(t, n, 7) + + n, err = bob1.Write([]byte("ping1.1")) + require.NoError(t, err) + assert.Equal(t, n, 7) + }() + + go func() { + alice2 := alice.Connect(id2) + defer func() { assert.NoError(t, alice2.Close()) }() + + n, err := alice2.Write([]byte("ping2.0")) + require.NoError(t, err) + assert.Equal(t, n, 7) + + buf := make([]byte, 20) + n, err = alice2.Read(buf) + require.NoError(t, err) + assert.Equal(t, n, 7) + assert.Equal(t, []byte("pong2.0"), buf[:n]) + + done2 <- true + }() + + go func() { + buf := make([]byte, 7) + n, err := io.ReadFull(alice1, buf) + require.NoError(t, err) + assert.Equal(t, n, 7) + assert.Equal(t, []byte("ping1.0"), buf[:n]) + + n, err = alice1.Read(buf) + require.NoError(t, err) + assert.Equal(t, n, 7) + assert.Equal(t, []byte("ping1.1"), buf[:n]) + + done1 <- true + }() + + go func() { + buf := make([]byte, 20) + n, err := bob2.Read(buf) + require.NoError(t, err) + assert.Equal(t, n, 7) + assert.Equal(t, []byte("ping2.0"), buf[:n]) + + n, err = bob2.Write([]byte("pong2.0")) + require.NoError(t, err) + assert.Equal(t, n, 7) + }() + + <-done1 + <-done2 +} + +// Closing a muxer during a read should unblock, but return an error. +func TestMuxer_CloseDuringRead(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + + bob := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer func() { assert.NoError(t, bob.Close()) }() + + _, s := alice.Serve() + + go alice.Close() + buf := make([]byte, 20) + n, err := s.Read(buf) + assert.Equal(t, 0, n) + assert.NotNil(t, err) + assert.NotEqual(t, io.EOF, err) +} + +// Closing a stream during a read should unblock and return io.EOF since this is the way to +// gracefully close a connection. +func TestMuxer_StreamCloseDuringRead(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + defer func() { assert.NoError(t, alice.Close()) }() + + bob := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer func() { assert.NoError(t, bob.Close()) }() + + _, s := alice.Serve() + + go s.Close() + buf := make([]byte, 20) + n, err := s.Read(buf) + assert.Equal(t, 0, n) + assert.Equal(t, io.EOF, err) +} + +// Closing a muxer during a write should unblock, but return an error. +func TestMuxer_CloseDuringWrite(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + + // Don't connect bob to let writes will block forever. + defer r2.Close() + defer w1.Close() + + _, s := alice.Serve() + + go alice.Close() + buf := make([]byte, 20) + n, err := s.Write(buf) + assert.Equal(t, 0, n) + assert.NotNil(t, err) + assert.NotEqual(t, io.EOF, err) +} + +func TestMuxer_ReadWrite(t *testing.T) { + r1, w1 := io.Pipe() + r2, w2 := io.Pipe() + + alice := NewMuxer(NewReadWriteCloser(r1, w2), false) + defer func() { assert.NoError(t, alice.Close()) }() + + bob := NewMuxer(NewReadWriteCloser(r2, w1), true) + defer func() { assert.NoError(t, bob.Close()) }() + + go alice.Write([]byte("hello")) + buf := make([]byte, 20) + n, err := bob.Read(buf) + assert.Equal(t, 5, n) + assert.Nil(t, err) + assert.Equal(t, []byte("hello"), buf[:n]) +} |