diff options
Diffstat (limited to 'plugin/rpcplugin')
-rw-r--r-- | plugin/rpcplugin/rpcplugintest/supervisor.go | 39 | ||||
-rw-r--r-- | plugin/rpcplugin/supervisor.go | 15 |
2 files changed, 51 insertions, 3 deletions
diff --git a/plugin/rpcplugin/rpcplugintest/supervisor.go b/plugin/rpcplugin/rpcplugintest/supervisor.go index 2ae065621..d225f96fc 100644 --- a/plugin/rpcplugin/rpcplugintest/supervisor.go +++ b/plugin/rpcplugin/rpcplugintest/supervisor.go @@ -174,6 +174,14 @@ func testSupervisor_PluginCrash(t *testing.T, sp SupervisorProviderFunc) { bundle := model.BundleInfoForPath(dir) supervisor, err := sp(bundle) require.NoError(t, err) + + var supervisorWaitErr error + supervisorWaitDone := make(chan bool, 1) + go func() { + supervisorWaitErr = supervisor.Wait() + close(supervisorWaitDone) + }() + require.NoError(t, supervisor.Start(&api)) failed := false @@ -189,7 +197,21 @@ func testSupervisor_PluginCrash(t *testing.T, sp SupervisorProviderFunc) { time.Sleep(time.Millisecond * 100) } assert.True(t, recovered) + + select { + case <-supervisorWaitDone: + require.Fail(t, "supervisor.Wait() unexpectedly returned") + case <-time.After(500 * time.Millisecond): + } + require.NoError(t, supervisor.Stop()) + + select { + case <-supervisorWaitDone: + require.Nil(t, supervisorWaitErr) + case <-time.After(5000 * time.Millisecond): + require.Fail(t, "supervisor.Wait() failed to return") + } } // Crashed plugins should be relaunched at most three times. @@ -239,6 +261,14 @@ func testSupervisor_PluginRepeatedlyCrash(t *testing.T, sp SupervisorProviderFun bundle := model.BundleInfoForPath(dir) supervisor, err := sp(bundle) require.NoError(t, err) + + var supervisorWaitErr error + supervisorWaitDone := make(chan bool, 1) + go func() { + supervisorWaitErr = supervisor.Wait() + close(supervisorWaitDone) + }() + require.NoError(t, supervisor.Start(&api)) for attempt := 1; attempt <= 4; attempt++ { @@ -264,10 +294,19 @@ func testSupervisor_PluginRepeatedlyCrash(t *testing.T, sp SupervisorProviderFun } if attempt < 4 { + require.Nil(t, supervisorWaitErr) require.True(t, recovered, "failed to recover after attempt %d", attempt) } else { require.False(t, recovered, "unexpectedly recovered after attempt %d", attempt) } } + + select { + case <-supervisorWaitDone: + require.NotNil(t, supervisorWaitErr) + case <-time.After(500 * time.Millisecond): + require.Fail(t, "supervisor.Wait() failed to return after plugin crashed") + } + require.NoError(t, supervisor.Stop()) } diff --git a/plugin/rpcplugin/supervisor.go b/plugin/rpcplugin/supervisor.go index 6e26d5682..246747c89 100644 --- a/plugin/rpcplugin/supervisor.go +++ b/plugin/rpcplugin/supervisor.go @@ -32,6 +32,7 @@ type Supervisor struct { cancel context.CancelFunc newProcess func(context.Context) (Process, io.ReadWriteCloser, error) pluginId string + pluginErr error } var _ plugin.Supervisor = (*Supervisor)(nil) @@ -55,6 +56,13 @@ func (s *Supervisor) Start(api plugin.API) error { } } +// Waits for the supervisor to stop (on demand or of its own accord), returning any error that +// triggered the supervisor to stop. +func (s *Supervisor) Wait() error { + <-s.done + return s.pluginErr +} + // Stops the plugin. func (s *Supervisor) Stop() error { s.cancel() @@ -70,7 +78,7 @@ func (s *Supervisor) Hooks() plugin.Hooks { func (s *Supervisor) run(ctx context.Context, start chan<- error, api plugin.API) { defer func() { - s.done <- true + close(s.done) }() done := ctx.Done() for i := 0; i <= MaxProcessRestarts; i++ { @@ -81,10 +89,11 @@ func (s *Supervisor) run(ctx context.Context, start chan<- error, api plugin.API default: start = nil if i < MaxProcessRestarts { - mlog.Debug("Plugin terminated unexpectedly", mlog.String("plugin_id", s.pluginId)) + mlog.Error("Plugin terminated unexpectedly", mlog.String("plugin_id", s.pluginId)) time.Sleep(time.Duration((1 + i*i)) * time.Second) } else { - mlog.Debug("Plugin terminated unexpectedly too many times", mlog.String("plugin_id", s.pluginId), mlog.Int("max_process_restarts", MaxProcessRestarts)) + s.pluginErr = fmt.Errorf("plugin terminated unexpectedly too many times") + mlog.Error("Plugin shutdown", mlog.String("plugin_id", s.pluginId), mlog.Int("max_process_restarts", MaxProcessRestarts), mlog.Err(s.pluginErr)) } } } |