package auth
import (
"context"
"crypto/jwt"
"fmt"
"strings"
"time"
)
type JWTAuthPlugin struct {
secretKey []byte
issuer string
}
func NewJWTAuthPlugin(secretKey string, issuer string) *JWTAuthPlugin {
return &JWTAuthPlugin{
secretKey: []byte(secretKey),
issuer: issuer,
}
}
func (p *JWTAuthPlugin) Name() string {
return "jwt-auth"
}
func (p *JWTAuthPlugin) Version() string {
return "1.0.0"
}
func (p *JWTAuthPlugin) Authenticate(ctx context.Context, token string) (*UserInfo, error) {
if !strings.HasPrefix(token, "Bearer ") {
return nil, fmt.Errorf("invalid token format")
}
tokenString := strings.TrimPrefix(token, "Bearer ")
claims := &jwt.StandardClaims{}
parsedToken, err := jwt.ParseWithClaims(tokenString, claims, func(token *jwt.Token) (interface{}, error) {
return p.secretKey, nil
})
if err != nil || !parsedToken.Valid {
return nil, fmt.Errorf("invalid token")
}
if claims.Issuer != p.issuer {
return nil, fmt.Errorf("invalid issuer")
}
if time.Now().Unix() > claims.ExpiresAt {
return nil, fmt.Errorf("token expired")
}
return &UserInfo{
Username: claims.Subject,
Roles: extractRoles(claims),
}, nil
}
func (p *JWTAuthPlugin) Authorize(user *UserInfo, resource string, action string) error {
permissions := getPermissions(user.Roles)
for _, permission := range permissions {
if permission.Resource == resource && permission.Action == action {
return nil
}
}
return fmt.Errorf("access denied: user %s cannot %s on %s",
user.Username, action, resource)
}
type UserInfo struct {
Username string
Roles []string
}
type Permission struct {
Resource string
Action string
}
Developing Metrics and Monitoring Plugins
Observability is crucial for maintaining healthy Milvus deployments. Custom monitoring plugins enable integration with enterprise monitoring stacks and provide domain-specific metrics.
package monitoring
import (
"context"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type MetricsPlugin struct {
queryDuration prometheus.HistogramVec
indexOperations prometheus.CounterVec
vectorCount prometheus.GaugeVec
errorRate prometheus.CounterVec
}
func NewMetricsPlugin() *MetricsPlugin {
return &MetricsPlugin{
queryDuration: *promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "milvus_query_duration_seconds",
Help: "Duration of query operations",
Buckets: prometheus.DefBuckets,
},
[]string{"collection", "operation_type"},
),
indexOperations: *promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "milvus_index_operations_total",
Help: "Total number of index operations",
},
[]string{"collection", "index_type", "status"},
),
vectorCount: *promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "milvus_vectors_total",
Help: "Total number of vectors in collections",
},
[]string{"collection"},
),
errorRate: *promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "milvus_errors_total",
Help: "Total number of errors",
},
[]string{"operation", "error_type"},
),
}
}
func (m *MetricsPlugin) RecordQueryDuration(collection string, operation string, duration time.Duration) {
m.queryDuration.WithLabelValues(collection, operation).Observe(duration.Seconds())
}
func (m *MetricsPlugin) RecordIndexOperation(collection string, indexType string, status string) {
m.indexOperations.WithLabelValues(collection, indexType, status).Inc()
}
func (m *MetricsPlugin) UpdateVectorCount(collection string, count float64) {
m.vectorCount.WithLabelValues(collection).Set(count)
}
func (m *MetricsPlugin) RecordError(operation string, errorType string) {
m.errorRate.WithLabelValues(operation, errorType).Inc()
}
// Middleware for automatic metrics collection
func (m *MetricsPlugin) QueryMiddleware(next QueryHandler) QueryHandler {
return func(ctx context.Context, req *QueryRequest) (*QueryResponse, error) {
start := time.Now()
resp, err := next(ctx, req)
duration := time.Since(start)
m.RecordQueryDuration(req.Collection, req.OperationType, duration)
if err != nil {
m.RecordError("query", classifyError(err))
}
return resp, err
}
}
Third-Party Integration Patterns
Modern applications require seamless integration with various external systems. Milvus plugins can facilitate connections to message queues, data lakes, and other databases.
package integrations
import (
"context"
"encoding/json"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
)
type KafkaIntegrationPlugin struct {
producer *kafka.Producer
consumer *kafka.Consumer
topics map[string]string
}
func NewKafkaIntegration(brokers string, topics map[string]string) (*KafkaIntegrationPlugin, error) {
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": brokers,
"acks": "all",
})
if err != nil {
return nil, err
}
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": brokers,
"group.id": "milvus-consumer",
"auto.offset.reset": "earliest",
})
if err != nil {
return nil, err
}
return &KafkaIntegrationPlugin{
producer: producer,
consumer: consumer,
topics: topics,
}, nil
}
func (k *KafkaIntegrationPlugin) PublishVectorEvent(event *VectorEvent) error {
data, err := json.Marshal(event)
if err != nil {
return err
}
topic := k.topics["vector_events"]
return k.producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny,
},
Value: data,
}, nil)
}
func (k *KafkaIntegrationPlugin) ConsumeDataUpdates(ctx context.Context, handler DataUpdateHandler) error {
topic := k.topics["data_updates"]
err := k.consumer.Subscribe(topic, nil)
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
msg, err := k.consumer.ReadMessage(100)
if err != nil {
continue
}
var update DataUpdate
if err := json.Unmarshal(msg.Value, &update); err != nil {
continue
}
if err := handler.Handle(&update); err != nil {
// Handle error appropriately
continue
}
k.consumer.Commit()
}
}
}
type S3BackupPlugin struct {
s3Client *s3.S3
bucket string
}
func NewS3BackupPlugin(region, bucket string) *S3BackupPlugin {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(region),
}))
return &S3BackupPlugin{
s3Client: s3.New(sess),
bucket: bucket,
}
}
func (s *S3BackupPlugin) BackupCollection(collection string, data []byte) error {
key := fmt.Sprintf("backups/%s/%d.backup", collection, time.Now().Unix())
_, err := s.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
})
return err
}
Plugin Registration and Lifecycle Management
Proper plugin lifecycle management ensures stable operation and clean resource handling.
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
)
type PluginManager struct {
plugins []Plugin
ctx context.Context
cancel context.CancelFunc
}
func NewPluginManager() *PluginManager {
ctx, cancel := context.WithCancel(context.Background())
return &PluginManager{
plugins: make([]Plugin, 0),
ctx: ctx,
cancel: cancel,
}
}
func (pm *PluginManager) RegisterPlugin(plugin Plugin) error {
if err := plugin.Init(getPluginConfig(plugin.Name())); err != nil {
return fmt.Errorf("failed to initialize plugin %s: %w", plugin.Name(), err)
}
pm.plugins = append(pm.plugins, plugin)
log.Printf("Registered plugin: %s v%s", plugin.Name(), plugin.Version())
return nil
}
func (pm *PluginManager) StartAll() error {
for _, plugin := range pm.plugins {
if err := plugin.Start(); err != nil {
return fmt.Errorf("failed to start plugin %s: %w", plugin.Name(), err)
}
log.Printf("Started plugin: %s", plugin.Name())
}
return nil
}
func (pm *PluginManager) StopAll() {
for i := len(pm.plugins) - 1; i >= 0; i-- {
plugin := pm.plugins[i]
if err := plugin.Stop(); err != nil {
log.Printf("Error stopping plugin %s: %v", plugin.Name(), err)
} else {
log.Printf("Stopped plugin: %s", plugin.Name())
}
}
}
func (pm *PluginManager) Run() {
// Handle graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
log.Println("Received shutdown signal, stopping plugins...")
pm.cancel()
pm.StopAll()
os.Exit(0)
}()
// Keep the manager running
<-pm.ctx.Done()
}
func main() {
manager := NewPluginManager()
// Register plugins
authPlugin := NewJWTAuthPlugin("secret-key", "milvus-issuer")
metricsPlugin := NewMetricsPlugin()
kafkaPlugin, _ := NewKafkaIntegration("localhost:9092", map[string]string{
"vector_events": "milvus-events",
"data_updates": "milvus-updates",
})
manager.RegisterPlugin(authPlugin)
manager.RegisterPlugin(metricsPlugin)
manager.RegisterPlugin(kafkaPlugin)
if err := manager.StartAll(); err != nil {
log.Fatalf("Failed to start plugins: %v", err)
}
log.Println("All plugins started successfully")
manager.Run()
}
Best Practices and Performance Considerations
When developing Milvus plugins, consider these optimization strategies:
- Asynchronous Processing: Use goroutines and channels for non-blocking operations
- Connection Pooling: Implement connection pools for external service integrations
- Caching: Cache frequently accessed data to reduce latency
- Error Handling: Implement robust error handling with proper logging
- Resource Management: Always clean up resources in plugin stop methods
- Configuration Management: Use environment variables and configuration files for flexibility
Conclusion
Milvus’s plugin architecture provides a powerful foundation for building customized vector database solutions. By implementing custom data types, authentication mechanisms, monitoring systems, and third-party integrations, developers can create highly specialized deployments that meet specific enterprise requirements.
The examples provided demonstrate practical patterns for extending Milvus functionality while maintaining system stability and performance. As vector databases continue to evolve, the ability to customize and extend core functionality through plugins will remain a critical capability for enterprise AI applications.
Remember to thoroughly test plugins in development environments and implement proper monitoring to ensure production stability. The investment in custom plugins pays dividends in operational efficiency and system integration capabilities.
Further Reading:
Milvus Migration Strategies
PostgreSQL Threat Modeling for FinTech
Optimizing Azure Database for MySQL
Securing User Accounts in PostgreSQL
Terminating Non-Responsive Redis Instances in a Redis Cluster
Be the first to comment