Datasources

OpenTSDB

GoFr supports injecting OpenTSDB to facilitate interaction with OpenTSDB's REST APIs. Implementations adhering to the OpenTSDB interface can be registered with app.AddOpenTSDB(), enabling applications to leverage OpenTSDB for time-series data management through gofr.Context.

// OpenTSDB provides methods for GoFr applications to communicate with OpenTSDB
// through its REST APIs.
type OpenTSDB interface {
	// HealthChecker verifies if the OpenTSDB server is reachable.
	// Returns an error if the server is unreachable, otherwise nil.
	HealthChecker

	// PutDataPoints sends data to store metrics in OpenTSDB.
	//
	// Parameters:
	// - ctx: Context for managing request lifetime.
	// - data: A slice of DataPoint objects; must contain at least one entry.
	// - queryParam: Specifies the response format:
	//   - client.PutRespWithSummary: Requests a summary response.
	//   - client.PutRespWithDetails: Requests detailed response information.
	//   - Empty string (""): No additional response details.
	//
	// - res: A pointer to PutResponse, where the server's response will be stored.
	//
	// Returns:
	// - Error if parameters are invalid, response parsing fails, or if connectivity issues occur.
	PutDataPoints(ctx context.Context, data any, queryParam string, res any) error

	// QueryDataPoints retrieves data based on the specified parameters.
	//
	// Parameters:
	// - ctx: Context for managing request lifetime.
	// - param: An instance of QueryParam with query parameters for filtering data.
	// - res: A pointer to QueryResponse, where the server's response will be stored.
	QueryDataPoints(ctx context.Context, param any, res any) error

	// QueryLatestDataPoints fetches the latest data point(s).
	//
	// Parameters:
	// - ctx: Context for managing request lifetime.
	// - param: An instance of QueryLastParam with query parameters for the latest data point.
	// - res: A pointer to QueryLastResponse, where the server's response will be stored.
	QueryLatestDataPoints(ctx context.Context, param any, res any) error

	// GetAggregators retrieves available aggregation functions.
	//
	// Parameters:
	// - ctx: Context for managing request lifetime.
	// - res: A pointer to AggregatorsResponse, where the server's response will be stored.
	GetAggregators(ctx context.Context, res any) error

	// QueryAnnotation retrieves a single annotation.
	//
	// Parameters:
	// - ctx: Context for managing request lifetime.
	// - queryAnnoParam: A map of parameters for the annotation query, such as client.AnQueryStartTime, client.AnQueryTSUid.
	// - res: A pointer to AnnotationResponse, where the server's response will be stored.
	QueryAnnotation(ctx context.Context, queryAnnoParam map[string]any, res any) error

	// PostAnnotation creates or updates an annotation.
	//
	// Parameters:
	// - ctx: Context for managing request lifetime.
	// - annotation: The annotation to be created or updated.
	// - res: A pointer to AnnotationResponse, where the server's response will be stored.
	PostAnnotation(ctx context.Context, annotation any, res any) error

	// PutAnnotation creates or replaces an annotation.
	// Fields not included in the request will be reset to default values.
	//
	// Parameters:
	// - ctx: Context for managing request lifetime.
	// - annotation: The annotation to be created or replaced.
	// - res: A pointer to AnnotationResponse, where the server's response will be stored.
	PutAnnotation(ctx context.Context, annotation any, res any) error

	// DeleteAnnotation removes an annotation.
	//
	// Parameters:
	// - ctx: Context for managing request lifetime.
	// - annotation: The annotation to be deleted.
	// - res: A pointer to AnnotationResponse, where the server's response will be stored.
	DeleteAnnotation(ctx context.Context, annotation any, res any) error
}

Import the gofr's external driver for OpenTSDB:

go get gofr.dev/pkg/gofr/datasource/opentsdb

The following example demonstrates injecting an OpenTSDB instance into a GoFr application and using it to perform a health check on the OpenTSDB server.

package main

import (
	"context"
	"fmt"
	"math/rand/v2"
	"time"

	"gofr.dev/pkg/gofr"
	"gofr.dev/pkg/gofr/datasource/opentsdb"
)

func main() {
	app := gofr.New()

	// Initialize OpenTSDB connection
	app.AddOpenTSDB(opentsdb.New(opentsdb.Config{
		Host:             "localhost:4242",
		MaxContentLength: 4096,
		MaxPutPointsNum:  1000,
		DetectDeltaNum:   10,
	}))

	// Register routes
	app.GET("/health", opentsdbHealthCheck)
	app.POST("/write", writeDataPoints)
	app.GET("/query", queryDataPoints)
	// Run the app
	app.Run()
}

// Health check for OpenTSDB
func opentsdbHealthCheck(c *gofr.Context) (any, error) {
	res, err := c.OpenTSDB.HealthCheck(context.Background())
	if err != nil {
		return nil, err
	}
	return res, nil
}

// Write Data Points to OpenTSDB
func writeDataPoints(c *gofr.Context) (any, error) {
	PutDataPointNum := 4
	name := []string{"cpu", "disk", "net", "mem"}
	cpuDatas := make([]opentsdb.DataPoint, 0)

	tags := map[string]string{
		"host":      "gofr-host",
		"try-name":  "gofr-sample",
		"demo-name": "opentsdb-test",
	}

	for i := 0; i < PutDataPointNum; i++ {
		data := opentsdb.DataPoint{
			Metric:    name[i%len(name)],
			Timestamp: time.Now().Unix(),
			Value:     rand.Float64() * 100,
			Tags:      tags,
		}
		cpuDatas = append(cpuDatas, data)
	}

	resp := opentsdb.PutResponse{}

	err := c.OpenTSDB.PutDataPoints(context.Background(), cpuDatas, "details", &resp)
	if err != nil {
		return resp.Errors, err
	}

	return fmt.Sprintf("%v Data points written successfully", resp.Success), nil
}

// Query Data Points from OpenTSDB
func queryDataPoints(c *gofr.Context) (any, error) {
	st1 := time.Now().Unix() - 3600
	st2 := time.Now().Unix()

	queryParam := opentsdb.QueryParam{
		Start: st1,
		End:   st2,
	}

	name := []string{"cpu", "disk", "net", "mem"}
	subqueries := make([]opentsdb.SubQuery, 0)
	tags := map[string]string{
		"host":      "gofr-host",
		"try-name":  "gofr-sample",
		"demo-name": "opentsdb-test",
	}

	for _, metric := range name {
		subQuery := opentsdb.SubQuery{
			Aggregator: "sum",
			Metric:     metric,
			Tags:       tags,
		}
		subqueries = append(subqueries, subQuery)
	}

	queryParam.Queries = subqueries

	queryResp := &opentsdb.QueryResponse{}

	err := c.OpenTSDB.QueryDataPoints(c, &queryParam, queryResp)
	if err != nil {
		return nil, err
	}
	return queryResp.QueryRespCnts, nil
}
Previous
MongoDB