diff --git a/cmd/gateway/main.go b/cmd/gateway/main.go index 8c4b142..9360bfb 100644 --- a/cmd/gateway/main.go +++ b/cmd/gateway/main.go @@ -9,6 +9,8 @@ import ( "log/slog" "net/http" "os" + "os/signal" + "syscall" "time" _ "github.com/go-sql-driver/mysql" @@ -120,10 +122,44 @@ func main() { IdleTimeout: 120 * time.Second, } - logger.Info("open responses gateway listening", slog.String("address", addr)) - if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { - logger.Error("server error", slog.String("error", err.Error())) - os.Exit(1) + // Set up signal handling for graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + + // Run server in a goroutine + serverErrors := make(chan error, 1) + go func() { + logger.Info("open responses gateway listening", slog.String("address", addr)) + serverErrors <- srv.ListenAndServe() + }() + + // Wait for shutdown signal or server error + select { + case err := <-serverErrors: + if err != nil && err != http.ErrServerClosed { + logger.Error("server error", slog.String("error", err.Error())) + os.Exit(1) + } + case sig := <-sigChan: + logger.Info("received shutdown signal", slog.String("signal", sig.String())) + + // Create shutdown context with timeout + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer shutdownCancel() + + // Shutdown the HTTP server gracefully + logger.Info("shutting down server gracefully") + if err := srv.Shutdown(shutdownCtx); err != nil { + logger.Error("server shutdown error", slog.String("error", err.Error())) + } + + // Close conversation store + logger.Info("closing conversation store") + if err := convStore.Close(); err != nil { + logger.Error("error closing conversation store", slog.String("error", err.Error())) + } + + logger.Info("shutdown complete") } } diff --git a/internal/conversation/conversation.go b/internal/conversation/conversation.go index ff757c8..b00b193 100644 --- a/internal/conversation/conversation.go +++ b/internal/conversation/conversation.go @@ -14,6 +14,7 @@ type Store interface { Append(id string, messages ...api.Message) (*Conversation, error) Delete(id string) error Size() int + Close() error } // MemoryStore manages conversation history in-memory with automatic expiration. @@ -21,6 +22,7 @@ type MemoryStore struct { conversations map[string]*Conversation mu sync.RWMutex ttl time.Duration + done chan struct{} } // Conversation holds the message history for a single conversation thread. @@ -37,13 +39,14 @@ func NewMemoryStore(ttl time.Duration) *MemoryStore { s := &MemoryStore{ conversations: make(map[string]*Conversation), ttl: ttl, + done: make(chan struct{}), } - + // Start cleanup goroutine if TTL is set if ttl > 0 { go s.cleanup() } - + return s } @@ -140,16 +143,21 @@ func (s *MemoryStore) Delete(id string) error { func (s *MemoryStore) cleanup() { ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() - - for range ticker.C { - s.mu.Lock() - now := time.Now() - for id, conv := range s.conversations { - if now.Sub(conv.UpdatedAt) > s.ttl { - delete(s.conversations, id) + + for { + select { + case <-ticker.C: + s.mu.Lock() + now := time.Now() + for id, conv := range s.conversations { + if now.Sub(conv.UpdatedAt) > s.ttl { + delete(s.conversations, id) + } } + s.mu.Unlock() + case <-s.done: + return } - s.mu.Unlock() } } @@ -159,3 +167,9 @@ func (s *MemoryStore) Size() int { defer s.mu.RUnlock() return len(s.conversations) } + +// Close stops the cleanup goroutine and releases resources. +func (s *MemoryStore) Close() error { + close(s.done) + return nil +} diff --git a/internal/conversation/redis_store.go b/internal/conversation/redis_store.go index 5c96ba2..146a32d 100644 --- a/internal/conversation/redis_store.go +++ b/internal/conversation/redis_store.go @@ -122,3 +122,8 @@ func (s *RedisStore) Size() int { return count } + +// Close closes the Redis client connection. +func (s *RedisStore) Close() error { + return s.client.Close() +} diff --git a/internal/conversation/sql_store.go b/internal/conversation/sql_store.go index d1a7e84..bcfd503 100644 --- a/internal/conversation/sql_store.go +++ b/internal/conversation/sql_store.go @@ -41,6 +41,7 @@ type SQLStore struct { db *sql.DB ttl time.Duration dialect sqlDialect + done chan struct{} } // NewSQLStore creates a SQL-backed conversation store. It creates the @@ -58,7 +59,12 @@ func NewSQLStore(db *sql.DB, driver string, ttl time.Duration) (*SQLStore, error return nil, err } - s := &SQLStore{db: db, ttl: ttl, dialect: newDialect(driver)} + s := &SQLStore{ + db: db, + ttl: ttl, + dialect: newDialect(driver), + done: make(chan struct{}), + } if ttl > 0 { go s.cleanup() } @@ -144,8 +150,19 @@ func (s *SQLStore) cleanup() { ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() - for range ticker.C { - cutoff := time.Now().Add(-s.ttl) - _, _ = s.db.Exec(s.dialect.cleanup, cutoff) + for { + select { + case <-ticker.C: + cutoff := time.Now().Add(-s.ttl) + _, _ = s.db.Exec(s.dialect.cleanup, cutoff) + case <-s.done: + return + } } } + +// Close stops the cleanup goroutine and closes the database connection. +func (s *SQLStore) Close() error { + close(s.done) + return s.db.Close() +} diff --git a/internal/server/mocks_test.go b/internal/server/mocks_test.go index 122937c..cbc8ccd 100644 --- a/internal/server/mocks_test.go +++ b/internal/server/mocks_test.go @@ -220,6 +220,10 @@ func (m *mockConversationStore) Size() int { return len(m.conversations) } +func (m *mockConversationStore) Close() error { + return nil +} + func (m *mockConversationStore) setConversation(id string, conv *conversation.Conversation) { m.mu.Lock() defer m.mu.Unlock()