// Copyright 2020 InfluxData, Inc. All rights reserved. // Use of this source code is governed by MIT // license that can be found in the LICENSE file. // Package influxdb2 provides API for using InfluxDB client in Go. // It's intended to use with InfluxDB 2 server. WriteAPI, QueryAPI and Health work also with InfluxDB 1.8 package influxdb2 import ( "context" "errors" "strings" "sync" "github.com/influxdata/influxdb-client-go/v2/api" "github.com/influxdata/influxdb-client-go/v2/api/http" "github.com/influxdata/influxdb-client-go/v2/domain" ilog "github.com/influxdata/influxdb-client-go/v2/internal/log" "github.com/influxdata/influxdb-client-go/v2/log" ) // Client provides API to communicate with InfluxDBServer. // There two APIs for writing, WriteAPI and WriteAPIBlocking. // WriteAPI provides asynchronous, non-blocking, methods for writing time series data. // WriteAPIBlocking provides blocking methods for writing time series data. type Client interface { // Setup sends request to initialise new InfluxDB server with user, org and bucket, and data retention period // and returns details about newly created entities along with the authorization object. // Retention period of zero will result to infinite retention. Setup(ctx context.Context, username, password, org, bucket string, retentionPeriodHours int) (*domain.OnboardingResponse, error) // Ready checks InfluxDB server is running. It doesn't validate authentication params. Ready(ctx context.Context) (bool, error) // Health returns an InfluxDB server health check result. Read the HealthCheck.Status field to get server status. // Health doesn't validate authentication params. Health(ctx context.Context) (*domain.HealthCheck, error) // Close ensures all ongoing asynchronous write clients finish. // Also closes all idle connections, in case of HTTP client was created internally. Close() // Options returns the options associated with client Options() *Options // ServerURL returns the url of the server url client talks to ServerURL() string // HTTPService returns underlying HTTP service object used by client HTTPService() http.Service // WriteAPI returns the asynchronous, non-blocking, Write client WriteAPI(org, bucket string) api.WriteAPI // WriteAPIBlocking returns the synchronous, blocking, Write client WriteAPIBlocking(org, bucket string) api.WriteAPIBlocking // QueryAPI returns Query client QueryAPI(org string) api.QueryAPI // AuthorizationsAPI returns Authorizations API client. AuthorizationsAPI() api.AuthorizationsAPI // OrganizationsAPI returns Organizations API client OrganizationsAPI() api.OrganizationsAPI // UsersAPI returns Users API client. UsersAPI() api.UsersAPI // DeleteAPI returns Delete API client DeleteAPI() api.DeleteAPI // BucketsAPI returns Buckets API client BucketsAPI() api.BucketsAPI // LabelsAPI returns Labels API client LabelsAPI() api.LabelsAPI // TasksAPI returns Tasks API client TasksAPI() api.TasksAPI } // clientImpl implements Client interface type clientImpl struct { serverURL string options *Options writeAPIs map[string]api.WriteAPI syncWriteAPIs map[string]api.WriteAPIBlocking lock sync.Mutex httpService http.Service apiClient *domain.ClientWithResponses authAPI api.AuthorizationsAPI orgAPI api.OrganizationsAPI usersAPI api.UsersAPI deleteAPI api.DeleteAPI bucketsAPI api.BucketsAPI labelsAPI api.LabelsAPI tasksAPI api.TasksAPI } // NewClient creates Client for connecting to given serverURL with provided authentication token, with the default options. // serverURL is the InfluxDB server base URL, e.g. http://localhost:8086, // authToken is an authentication token. It can be empty in case of connecting to newly installed InfluxDB server, which has not been set up yet. // In such case, calling Setup() will set the authentication token. func NewClient(serverURL string, authToken string) Client { return NewClientWithOptions(serverURL, authToken, DefaultOptions()) } // NewClientWithOptions creates Client for connecting to given serverURL with provided authentication token // and configured with custom Options. // serverURL is the InfluxDB server base URL, e.g. http://localhost:8086, // authToken is an authentication token. It can be empty in case of connecting to newly installed InfluxDB server, which has not been set up yet. // In such case, calling Setup() will set authentication token func NewClientWithOptions(serverURL string, authToken string, options *Options) Client { normServerURL := serverURL if !strings.HasSuffix(normServerURL, "/") { // For subsequent path parts concatenation, url has to end with '/' normServerURL = serverURL + "/" } authorization := "" if len(authToken) > 0 { authorization = "Token " + authToken } service := http.NewService(normServerURL, authorization, options.httpOptions) client := &clientImpl{ serverURL: serverURL, options: options, writeAPIs: make(map[string]api.WriteAPI, 5), syncWriteAPIs: make(map[string]api.WriteAPIBlocking, 5), httpService: service, apiClient: domain.NewClientWithResponses(service), } if log.Log != nil { log.Log.SetLogLevel(options.LogLevel()) } ilog.Infof("Using URL '%s', token '%s'", serverURL, authToken) return client } func (c *clientImpl) Options() *Options { return c.options } func (c *clientImpl) ServerURL() string { return c.serverURL } func (c *clientImpl) HTTPService() http.Service { return c.httpService } func (c *clientImpl) Ready(ctx context.Context) (bool, error) { params := &domain.GetReadyParams{} response, err := c.apiClient.GetReadyWithResponse(ctx, params) if err != nil { return false, err } if response.JSONDefault != nil { return false, domain.DomainErrorToError(response.JSONDefault, response.StatusCode()) } return true, nil } func (c *clientImpl) Setup(ctx context.Context, username, password, org, bucket string, retentionPeriodHours int) (*domain.OnboardingResponse, error) { if username == "" || password == "" { return nil, errors.New("a username and password is required for a setup") } c.lock.Lock() defer c.lock.Unlock() params := &domain.PostSetupParams{} body := &domain.PostSetupJSONRequestBody{ Bucket: bucket, Org: org, Password: &password, RetentionPeriodHrs: &retentionPeriodHours, Username: username, } response, err := c.apiClient.PostSetupWithResponse(ctx, params, *body) if err != nil { return nil, err } if response.JSONDefault != nil { return nil, domain.DomainErrorToError(response.JSONDefault, response.StatusCode()) } c.httpService.SetAuthorization("Token " + *response.JSON201.Auth.Token) return response.JSON201, nil } func (c *clientImpl) Health(ctx context.Context) (*domain.HealthCheck, error) { params := &domain.GetHealthParams{} response, err := c.apiClient.GetHealthWithResponse(ctx, params) if err != nil { return nil, err } if response.JSONDefault != nil { return nil, domain.DomainErrorToError(response.JSONDefault, response.StatusCode()) } if response.JSON503 != nil { //unhealthy server return response.JSON503, nil } return response.JSON200, nil } func createKey(org, bucket string) string { return org + "\t" + bucket } func (c *clientImpl) WriteAPI(org, bucket string) api.WriteAPI { c.lock.Lock() defer c.lock.Unlock() key := createKey(org, bucket) if _, ok := c.writeAPIs[key]; !ok { w := api.NewWriteAPI(org, bucket, c.httpService, c.options.writeOptions) c.writeAPIs[key] = w } return c.writeAPIs[key] } func (c *clientImpl) WriteAPIBlocking(org, bucket string) api.WriteAPIBlocking { c.lock.Lock() defer c.lock.Unlock() key := createKey(org, bucket) if _, ok := c.syncWriteAPIs[key]; !ok { w := api.NewWriteAPIBlocking(org, bucket, c.httpService, c.options.writeOptions) c.syncWriteAPIs[key] = w } return c.syncWriteAPIs[key] } func (c *clientImpl) Close() { for key, w := range c.writeAPIs { wa := w.(*api.WriteAPIImpl) wa.Close() delete(c.writeAPIs, key) } for key := range c.syncWriteAPIs { delete(c.syncWriteAPIs, key) } if c.options.HTTPOptions().OwnHTTPClient() { c.options.HTTPOptions().HTTPClient().CloseIdleConnections() } } func (c *clientImpl) QueryAPI(org string) api.QueryAPI { return api.NewQueryAPI(org, c.httpService) } func (c *clientImpl) AuthorizationsAPI() api.AuthorizationsAPI { c.lock.Lock() defer c.lock.Unlock() if c.authAPI == nil { c.authAPI = api.NewAuthorizationsAPI(c.apiClient) } return c.authAPI } func (c *clientImpl) OrganizationsAPI() api.OrganizationsAPI { c.lock.Lock() defer c.lock.Unlock() if c.orgAPI == nil { c.orgAPI = api.NewOrganizationsAPI(c.apiClient) } return c.orgAPI } func (c *clientImpl) UsersAPI() api.UsersAPI { c.lock.Lock() defer c.lock.Unlock() if c.usersAPI == nil { c.usersAPI = api.NewUsersAPI(c.apiClient, c.httpService, c.options.HTTPClient()) } return c.usersAPI } func (c *clientImpl) DeleteAPI() api.DeleteAPI { c.lock.Lock() defer c.lock.Unlock() if c.deleteAPI == nil { c.deleteAPI = api.NewDeleteAPI(c.apiClient) } return c.deleteAPI } func (c *clientImpl) BucketsAPI() api.BucketsAPI { c.lock.Lock() defer c.lock.Unlock() if c.bucketsAPI == nil { c.bucketsAPI = api.NewBucketsAPI(c.apiClient) } return c.bucketsAPI } func (c *clientImpl) LabelsAPI() api.LabelsAPI { c.lock.Lock() defer c.lock.Unlock() if c.labelsAPI == nil { c.labelsAPI = api.NewLabelsAPI(c.apiClient) } return c.labelsAPI } func (c *clientImpl) TasksAPI() api.TasksAPI { c.lock.Lock() defer c.lock.Unlock() if c.tasksAPI == nil { c.tasksAPI = api.NewTasksAPI(c.apiClient) } return c.tasksAPI }