Add graceful shutdown
This commit is contained in:
@@ -14,6 +14,7 @@ type Store interface {
|
||||
Append(id string, messages ...api.Message) (*Conversation, error)
|
||||
Delete(id string) error
|
||||
Size() int
|
||||
Close() error
|
||||
}
|
||||
|
||||
// MemoryStore manages conversation history in-memory with automatic expiration.
|
||||
@@ -21,6 +22,7 @@ type MemoryStore struct {
|
||||
conversations map[string]*Conversation
|
||||
mu sync.RWMutex
|
||||
ttl time.Duration
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// Conversation holds the message history for a single conversation thread.
|
||||
@@ -37,13 +39,14 @@ func NewMemoryStore(ttl time.Duration) *MemoryStore {
|
||||
s := &MemoryStore{
|
||||
conversations: make(map[string]*Conversation),
|
||||
ttl: ttl,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
|
||||
// Start cleanup goroutine if TTL is set
|
||||
if ttl > 0 {
|
||||
go s.cleanup()
|
||||
}
|
||||
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
@@ -140,16 +143,21 @@ func (s *MemoryStore) Delete(id string) error {
|
||||
func (s *MemoryStore) cleanup() {
|
||||
ticker := time.NewTicker(1 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
s.mu.Lock()
|
||||
now := time.Now()
|
||||
for id, conv := range s.conversations {
|
||||
if now.Sub(conv.UpdatedAt) > s.ttl {
|
||||
delete(s.conversations, id)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
s.mu.Lock()
|
||||
now := time.Now()
|
||||
for id, conv := range s.conversations {
|
||||
if now.Sub(conv.UpdatedAt) > s.ttl {
|
||||
delete(s.conversations, id)
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
case <-s.done:
|
||||
return
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -159,3 +167,9 @@ func (s *MemoryStore) Size() int {
|
||||
defer s.mu.RUnlock()
|
||||
return len(s.conversations)
|
||||
}
|
||||
|
||||
// Close stops the cleanup goroutine and releases resources.
|
||||
func (s *MemoryStore) Close() error {
|
||||
close(s.done)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -122,3 +122,8 @@ func (s *RedisStore) Size() int {
|
||||
|
||||
return count
|
||||
}
|
||||
|
||||
// Close closes the Redis client connection.
|
||||
func (s *RedisStore) Close() error {
|
||||
return s.client.Close()
|
||||
}
|
||||
|
||||
@@ -41,6 +41,7 @@ type SQLStore struct {
|
||||
db *sql.DB
|
||||
ttl time.Duration
|
||||
dialect sqlDialect
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// NewSQLStore creates a SQL-backed conversation store. It creates the
|
||||
@@ -58,7 +59,12 @@ func NewSQLStore(db *sql.DB, driver string, ttl time.Duration) (*SQLStore, error
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s := &SQLStore{db: db, ttl: ttl, dialect: newDialect(driver)}
|
||||
s := &SQLStore{
|
||||
db: db,
|
||||
ttl: ttl,
|
||||
dialect: newDialect(driver),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
if ttl > 0 {
|
||||
go s.cleanup()
|
||||
}
|
||||
@@ -144,8 +150,19 @@ func (s *SQLStore) cleanup() {
|
||||
ticker := time.NewTicker(1 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
cutoff := time.Now().Add(-s.ttl)
|
||||
_, _ = s.db.Exec(s.dialect.cleanup, cutoff)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
cutoff := time.Now().Add(-s.ttl)
|
||||
_, _ = s.db.Exec(s.dialect.cleanup, cutoff)
|
||||
case <-s.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close stops the cleanup goroutine and closes the database connection.
|
||||
func (s *SQLStore) Close() error {
|
||||
close(s.done)
|
||||
return s.db.Close()
|
||||
}
|
||||
|
||||
@@ -220,6 +220,10 @@ func (m *mockConversationStore) Size() int {
|
||||
return len(m.conversations)
|
||||
}
|
||||
|
||||
func (m *mockConversationStore) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockConversationStore) setConversation(id string, conv *conversation.Conversation) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
Reference in New Issue
Block a user