Extending Milvus with Custom Plugins and Extensions

Extending Milvus with Custom Milvus Plugins and Extensions: A Complete Developer Guide



Vector databases have become the backbone of modern AI applications, and Milvus stands out as a leading open-source solution. While Milvus offers robust out-of-the-box functionality, its true power lies in its extensibility through custom plugins and extensions. This comprehensive guide explores how to leverage Milvus’s plugin architecture to build scalable, customized vector database solutions.

This guide will also highlight the importance of using Custom Milvus Plugins to enhance functionality and performance.

Understanding Milvus Plugin Architecture

Milvus employs a microservices architecture that naturally supports plugin-based extensions. The system is built around several core components that can be extended:

  • Query Coordinator: Manages query execution and routing
  • Data Coordinator: Handles data ingestion and storage
  • Index Coordinator: Manages vector indexing operations
  • Proxy: Acts as the gateway for client requests

The plugin architecture follows a well-defined interface pattern, allowing developers to inject custom logic at various points in the data pipeline without modifying core Milvus code.

type Plugin interface {
    Name() string
    Version() string
    Init(config map[string]interface{}) error
    Start() error
    Stop() error
}

Implementing Custom Data Types

Milvus supports various data types natively, but enterprise applications often require specialized data handling. Custom data type implementation involves creating type handlers that integrate seamlessly with Milvus’s storage and query engine.

package customtypes

import (
    "encoding/json"
    "fmt"
    "github.com/milvus-io/milvus/pkg/util/typeutil"
)

type GeoPointType struct {
    Latitude  float64 `json:"lat"`
    Longitude float64 `json:"lng"`
}

type GeoPointHandler struct{}

func (h *GeoPointHandler) Serialize(data interface{}) ([]byte, error) {
    geoPoint, ok := data.(GeoPointType)
    if !ok {
        return nil, fmt.Errorf("invalid data type for GeoPoint")
    }
    return json.Marshal(geoPoint)
}

func (h *GeoPointHandler) Deserialize(data []byte) (interface{}, error) {
    var geoPoint GeoPointType
    err := json.Unmarshal(data, &geoPoint)
    return geoPoint, err
}

func (h *GeoPointHandler) Validate(data interface{}) error {
    geoPoint, ok := data.(GeoPointType)
    if !ok {
        return fmt.Errorf("invalid data type")
    }

    if geoPoint.Latitude < -90 || geoPoint.Latitude > 90 {
        return fmt.Errorf("invalid latitude: %f", geoPoint.Latitude)
    }

    if geoPoint.Longitude < -180 || geoPoint.Longitude > 180 {
        return fmt.Errorf("invalid longitude: %f", geoPoint.Longitude)
    }

    return nil
}

// Register the custom type
func init() {
    typeutil.RegisterCustomType("geopoint", &GeoPointHandler{})
}

Building Authentication and Authorization Extensions

Security is paramount in production vector database deployments. Milvus provides hooks for implementing custom authentication and authorization mechanisms that integrate with existing enterprise identity systems.

package auth

import (
    "context"
    "crypto/jwt"
    "fmt"
    "strings"
    "time"
)

type JWTAuthPlugin struct {
    secretKey []byte
    issuer    string
}

func NewJWTAuthPlugin(secretKey string, issuer string) *JWTAuthPlugin {
    return &JWTAuthPlugin{
        secretKey: []byte(secretKey),
        issuer:    issuer,
    }
}

func (p *JWTAuthPlugin) Name() string {
    return "jwt-auth"
}

func (p *JWTAuthPlugin) Version() string {
    return "1.0.0"
}

func (p *JWTAuthPlugin) Authenticate(ctx context.Context, token string) (*UserInfo, error) {
    if !strings.HasPrefix(token, "Bearer ") {
        return nil, fmt.Errorf("invalid token format")
    }

    tokenString := strings.TrimPrefix(token, "Bearer ")

    claims := &jwt.StandardClaims{}
    parsedToken, err := jwt.ParseWithClaims(tokenString, claims, func(token *jwt.Token) (interface{}, error) {
        return p.secretKey, nil
    })

    if err != nil || !parsedToken.Valid {
        return nil, fmt.Errorf("invalid token")
    }

    if claims.Issuer != p.issuer {
        return nil, fmt.Errorf("invalid issuer")
    }

    if time.Now().Unix() > claims.ExpiresAt {
        return nil, fmt.Errorf("token expired")
    }

    return &UserInfo{
        Username: claims.Subject,
        Roles:    extractRoles(claims),
    }, nil
}

func (p *JWTAuthPlugin) Authorize(user *UserInfo, resource string, action string) error {
    permissions := getPermissions(user.Roles)

    for _, permission := range permissions {
        if permission.Resource == resource && permission.Action == action {
            return nil
        }
    }

    return fmt.Errorf("access denied: user %s cannot %s on %s", 
        user.Username, action, resource)
}

type UserInfo struct {
    Username string
    Roles    []string
}

type Permission struct {
    Resource string
    Action   string
}

Developing Metrics and Monitoring Plugins

Observability is crucial for maintaining healthy Milvus deployments. Custom monitoring plugins enable integration with enterprise monitoring stacks and provide domain-specific metrics.

package monitoring

import (
    "context"
    "time"

    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

type MetricsPlugin struct {
    queryDuration    prometheus.HistogramVec
    indexOperations  prometheus.CounterVec
    vectorCount      prometheus.GaugeVec
    errorRate        prometheus.CounterVec
}

func NewMetricsPlugin() *MetricsPlugin {
    return &MetricsPlugin{
        queryDuration: *promauto.NewHistogramVec(
            prometheus.HistogramOpts{
                Name: "milvus_query_duration_seconds",
                Help: "Duration of query operations",
                Buckets: prometheus.DefBuckets,
            },
            []string{"collection", "operation_type"},
        ),

        indexOperations: *promauto.NewCounterVec(
            prometheus.CounterOpts{
                Name: "milvus_index_operations_total",
                Help: "Total number of index operations",
            },
            []string{"collection", "index_type", "status"},
        ),

        vectorCount: *promauto.NewGaugeVec(
            prometheus.GaugeOpts{
                Name: "milvus_vectors_total",
                Help: "Total number of vectors in collections",
            },
            []string{"collection"},
        ),

        errorRate: *promauto.NewCounterVec(
            prometheus.CounterOpts{
                Name: "milvus_errors_total",
                Help: "Total number of errors",
            },
            []string{"operation", "error_type"},
        ),
    }
}

func (m *MetricsPlugin) RecordQueryDuration(collection string, operation string, duration time.Duration) {
    m.queryDuration.WithLabelValues(collection, operation).Observe(duration.Seconds())
}

func (m *MetricsPlugin) RecordIndexOperation(collection string, indexType string, status string) {
    m.indexOperations.WithLabelValues(collection, indexType, status).Inc()
}

func (m *MetricsPlugin) UpdateVectorCount(collection string, count float64) {
    m.vectorCount.WithLabelValues(collection).Set(count)
}

func (m *MetricsPlugin) RecordError(operation string, errorType string) {
    m.errorRate.WithLabelValues(operation, errorType).Inc()
}

// Middleware for automatic metrics collection
func (m *MetricsPlugin) QueryMiddleware(next QueryHandler) QueryHandler {
    return func(ctx context.Context, req *QueryRequest) (*QueryResponse, error) {
        start := time.Now()

        resp, err := next(ctx, req)

        duration := time.Since(start)
        m.RecordQueryDuration(req.Collection, req.OperationType, duration)

        if err != nil {
            m.RecordError("query", classifyError(err))
        }

        return resp, err
    }
}

Third-Party Integration Patterns

Modern applications require seamless integration with various external systems. Milvus plugins can facilitate connections to message queues, data lakes, and other databases.

package integrations

import (
    "context"
    "encoding/json"
    "fmt"

    "github.com/confluentinc/confluent-kafka-go/kafka"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/s3"
)

type KafkaIntegrationPlugin struct {
    producer *kafka.Producer
    consumer *kafka.Consumer
    topics   map[string]string
}

func NewKafkaIntegration(brokers string, topics map[string]string) (*KafkaIntegrationPlugin, error) {
    producer, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": brokers,
        "acks":             "all",
    })
    if err != nil {
        return nil, err
    }

    consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": brokers,
        "group.id":         "milvus-consumer",
        "auto.offset.reset": "earliest",
    })
    if err != nil {
        return nil, err
    }

    return &KafkaIntegrationPlugin{
        producer: producer,
        consumer: consumer,
        topics:   topics,
    }, nil
}

func (k *KafkaIntegrationPlugin) PublishVectorEvent(event *VectorEvent) error {
    data, err := json.Marshal(event)
    if err != nil {
        return err
    }

    topic := k.topics["vector_events"]
    return k.producer.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{
            Topic:     &topic,
            Partition: kafka.PartitionAny,
        },
        Value: data,
    }, nil)
}

func (k *KafkaIntegrationPlugin) ConsumeDataUpdates(ctx context.Context, handler DataUpdateHandler) error {
    topic := k.topics["data_updates"]
    err := k.consumer.Subscribe(topic, nil)
    if err != nil {
        return err
    }

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            msg, err := k.consumer.ReadMessage(100)
            if err != nil {
                continue
            }

            var update DataUpdate
            if err := json.Unmarshal(msg.Value, &update); err != nil {
                continue
            }

            if err := handler.Handle(&update); err != nil {
                // Handle error appropriately
                continue
            }

            k.consumer.Commit()
        }
    }
}

type S3BackupPlugin struct {
    s3Client *s3.S3
    bucket   string
}

func NewS3BackupPlugin(region, bucket string) *S3BackupPlugin {
    sess := session.Must(session.NewSession(&aws.Config{
        Region: aws.String(region),
    }))

    return &S3BackupPlugin{
        s3Client: s3.New(sess),
        bucket:   bucket,
    }
}

func (s *S3BackupPlugin) BackupCollection(collection string, data []byte) error {
    key := fmt.Sprintf("backups/%s/%d.backup", collection, time.Now().Unix())

    _, err := s.s3Client.PutObject(&s3.PutObjectInput{
        Bucket: aws.String(s.bucket),
        Key:    aws.String(key),
        Body:   bytes.NewReader(data),
    })

    return err
}

Plugin Registration and Lifecycle Management

Proper plugin lifecycle management ensures stable operation and clean resource handling.

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
)

type PluginManager struct {
    plugins []Plugin
    ctx     context.Context
    cancel  context.CancelFunc
}

func NewPluginManager() *PluginManager {
    ctx, cancel := context.WithCancel(context.Background())
    return &PluginManager{
        plugins: make([]Plugin, 0),
        ctx:     ctx,
        cancel:  cancel,
    }
}

func (pm *PluginManager) RegisterPlugin(plugin Plugin) error {
    if err := plugin.Init(getPluginConfig(plugin.Name())); err != nil {
        return fmt.Errorf("failed to initialize plugin %s: %w", plugin.Name(), err)
    }

    pm.plugins = append(pm.plugins, plugin)
    log.Printf("Registered plugin: %s v%s", plugin.Name(), plugin.Version())
    return nil
}

func (pm *PluginManager) StartAll() error {
    for _, plugin := range pm.plugins {
        if err := plugin.Start(); err != nil {
            return fmt.Errorf("failed to start plugin %s: %w", plugin.Name(), err)
        }
        log.Printf("Started plugin: %s", plugin.Name())
    }
    return nil
}

func (pm *PluginManager) StopAll() {
    for i := len(pm.plugins) - 1; i >= 0; i-- {
        plugin := pm.plugins[i]
        if err := plugin.Stop(); err != nil {
            log.Printf("Error stopping plugin %s: %v", plugin.Name(), err)
        } else {
            log.Printf("Stopped plugin: %s", plugin.Name())
        }
    }
}

func (pm *PluginManager) Run() {
    // Handle graceful shutdown
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        <-sigChan
        log.Println("Received shutdown signal, stopping plugins...")
        pm.cancel()
        pm.StopAll()
        os.Exit(0)
    }()

    // Keep the manager running
    <-pm.ctx.Done()
}

func main() {
    manager := NewPluginManager()

    // Register plugins
    authPlugin := NewJWTAuthPlugin("secret-key", "milvus-issuer")
    metricsPlugin := NewMetricsPlugin()
    kafkaPlugin, _ := NewKafkaIntegration("localhost:9092", map[string]string{
        "vector_events": "milvus-events",
        "data_updates":  "milvus-updates",
    })

    manager.RegisterPlugin(authPlugin)
    manager.RegisterPlugin(metricsPlugin)
    manager.RegisterPlugin(kafkaPlugin)

    if err := manager.StartAll(); err != nil {
        log.Fatalf("Failed to start plugins: %v", err)
    }

    log.Println("All plugins started successfully")
    manager.Run()
}

Best Practices and Performance Considerations

When developing Milvus plugins, consider these optimization strategies:

  1. Asynchronous Processing: Use goroutines and channels for non-blocking operations
  2. Connection Pooling: Implement connection pools for external service integrations
  3. Caching: Cache frequently accessed data to reduce latency
  4. Error Handling: Implement robust error handling with proper logging
  5. Resource Management: Always clean up resources in plugin stop methods
  6. Configuration Management: Use environment variables and configuration files for flexibility

Conclusion

Milvus’s plugin architecture provides a powerful foundation for building customized vector database solutions. By implementing custom data types, authentication mechanisms, monitoring systems, and third-party integrations, developers can create highly specialized deployments that meet specific enterprise requirements.

The examples provided demonstrate practical patterns for extending Milvus functionality while maintaining system stability and performance. As vector databases continue to evolve, the ability to customize and extend core functionality through plugins will remain a critical capability for enterprise AI applications.

Remember to thoroughly test plugins in development environments and implement proper monitoring to ensure production stability. The investment in custom plugins pays dividends in operational efficiency and system integration capabilities.



Further Reading:

Milvus Migration Strategies

PostgreSQL Threat Modeling for FinTech

Optimizing Azure Database for MySQL

Securing User Accounts in PostgreSQL

Terminating Non-Responsive Redis Instances in a Redis Cluster

 

About MinervaDB Corporation 115 Articles
Full-stack Database Infrastructure Architecture, Engineering and Operations Consultative Support(24*7) Provider for PostgreSQL, MySQL, MariaDB, MongoDB, ClickHouse, Trino, SQL Server, Cassandra, CockroachDB, Yugabyte, Couchbase, Redis, Valkey, NoSQL, NewSQL, Databricks, Amazon Resdhift, Amazon Aurora, CloudSQL, Snowflake and AzureSQL with core expertize in Performance, Scalability, High Availability, Database Reliability Engineering, Database Upgrades/Migration, and Data Security.

Be the first to comment

Leave a Reply