NAME AnyEvent::InfluxDB - An asynchronous library for InfluxDB time-series database VERSION version 0.02 SYNOPSIS use EV; use AnyEvent; use AnyEvent::Socket; use AnyEvent::Handle; use AnyEvent::InfluxDB; use Monitoring::Plugin::Performance; my $db = AnyEvent::InfluxDB->new( server => 'http://localhost:8086', username => 'admin', password => 'password', ); my $hdl; tcp_server undef, 8888, sub { my ($fh, $host, $port) = @_; $hdl = AnyEvent::Handle->new( fh => $fh, ); $hdl->push_read( line => sub { my (undef, $line) = @_; # Disk\t/=382MB;15264;15269;; /var=218MB;9443;9448 my ($measurement, $perfstring) = split(/\t/, $line); my @perfdata = Monitoring::Plugin::Performance->parse_perfstring($perfstring); $db->write( database => 'mydb', data => [ map { +{ measurement => $measurement, tags => { label => $_->label, }, fields => { value => $_->value, uom => '"'. $_->uom .'"', }, } } @perfdata ], on_success => sub { print "$line written\n"; }, on_error => sub { print "$line error: @_\n"; }, ); $hdl->on_drain( sub { $hdl->fh->close; undef $hdl; } ); }, ); }; EV::run; DESCRIPTION Asynchronous client library for InfluxDB time-series database <https://influxdb.com>. This version is meant to be used with InfluxDB v0.10 and newer. METHODS new my $db = AnyEvent::InfluxDB->new( server => 'http://localhost:8086', username => 'admin', password => 'password', ); Returns object representing given server "server" connected using optionally provided username "username" and password "password". Default value of "server" is "http://localhost:8086". If the server protocol is "https" then by default no validation of remote host certificate is performed. This can be changed by setting "ssl_options" parameter with any options accepted by AnyEvent::TLS. my $db = AnyEvent::InfluxDB->new( server => 'https://localhost:8086', username => 'admin', password => 'password', ssl_options => { verify => 1, verify_peername => 'https', ca_file => '/path/to/cacert.pem', } ); As an debugging aid the "on_request" code reference may also be provided. It will be executed before each request with the method name, url and POST data if set. my $db = AnyEvent::InfluxDB->new( on_request => sub { my ($method, $url, $post_data) = @_; print "$method $url\n"; print "$post_data\n" if $post_data; } ); ping $cv = AE::cv; $db->ping( wait_for_leader => 2, on_success => $cv, on_error => sub { $cv->croak("Failed to ping cluster leader: @_"); } ); my $version = $cv->recv; Check the leader of the cluster to ensure that the leader is available and ready. The optional parameter "wait_for_leader" specifies the number of seconds to wait before returning a response. The required "on_success" code reference is executed if request was successful with the value of "X-Influxdb-Version" response header as argument, otherwise executes the required "on_error" code reference with the value of "Reason" response header as argument. Database Management create_database $cv = AE::cv; $db->create_database( # raw query q => "CREATE DATABASE IF NOT EXISTS mydb", # or query created from arguments database => "mydb", if_not_exists => 1, # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to create database: @_"); } ); $cv->recv; Creates specified by "database" argument database. The optional parameter "if_not_exists" if set to true value, allows to avoid error if the database already exists. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. drop_database $cv = AE::cv; $db->drop_database( # raw query q => "DROP DATABASE IF EXISTS mydb", # or query created from arguments database => "mydb", if_exists => 1, # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to drop database: @_"); } ); $cv->recv; Drops specified by "database" argument database. The optional parameter "if_exists" if set to true value, allows to avoid error if the database does not exists. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. show_databases $cv = AE::cv; $db->show_databases( on_success => $cv, on_error => sub { $cv->croak("Failed to list databases: @_"); } ); my @db_names = $cv->recv; print "$_\n" for @db_names; Returns list of known database names. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. show_shards $cv = AE::cv; $db->show_shards( on_success => $cv, on_error => sub { $cv->croak("Failed to list shards: @_"); } ); my $shards = $cv->recv; for my $database ( sort keys %{ $shards } ) { print "Database: $database\n"; for my $s ( @{ $shards->{$database} } ) { print " * $_: $s->{$_}\n" for sort keys %{ $s }; } } Returns a hash reference with database name as keys and their shards as values. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. show_shard_groups $cv = AE::cv; $db->show_shard_groups( on_success => $cv, on_error => sub { $cv->croak("Failed to list shard groups: @_"); } ); my @shard_groups = $cv->recv; for my $sg ( @shard_groups ) { print "ID: $sg->{id}\n"; print "Database: $sg->{database}\n"; print "Retention Policy: $sg->{retention_policy}\n"; print "Start Time: $sg->{start_time}\n"; print "End Time: $sg->{end_time}\n"; print "Expiry Time: $sg->{expiry_time}\n"; } Returns a list of hash references with keys "name", "duration", "replicaN" and "default" for each replication policy defined on database "database". Returns a hash reference with database name as keys and their shards as values. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. Retention Policy Management create_retention_policy $cv = AE::cv; $db->create_retention_policy( # raw query q => "CREATE RETENTION POLICY last_day ON mydb DURATION 1d REPLICATION 1", # or query created from arguments name => 'last_day', database => 'mydb', duration => '1d', replication => 1, default => 0, # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to create retention policy: @_"); } ); $cv->recv; Creates new retention policy named by "name" on database "database" with duration "duration" and replication factor "replication". If "default" is provided and true the created retention policy becomes the default one. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. alter_retention_policy $cv = AE::cv; $db->alter_retention_policy( # raw query q => "ALTER RETENTION POLICY last_day ON mydb DURATION 1d REPLICATION 1 DEFAULT", # or query created from arguments name => 'last_day', database => 'mydb', duration => '1d', replication => 1, default => 1, # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to alter retention policy: @_"); } ); $cv->recv; Modifies retention policy named by "name" on database "database". At least one of duration "duration", replication factor "replication" or flag "default" must be set. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. show_retention_policies $cv = AE::cv; $db->show_retention_policies( # raw query q => "SHOW RETENTION POLICIES ON mydb", # or query created from arguments database => 'mydb', # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to list retention policies: @_"); } ); my @retention_policies = $cv->recv; for my $rp ( @retention_policies ) { print "Name: $rp->{name}\n"; print "Duration: $rp->{duration}\n"; print "Replication factor: $rp->{replicaN}\n"; print "Default?: $rp->{default}\n"; } Returns a list of hash references with keys "name", "duration", "replicaN" and "default" for each replication policy defined on database "database". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. drop_retention_policy $cv = AE::cv; $db->drop_retention_policy( # raw query q => "DROP RETENTION POLICY last_day ON mydb", # or query created from arguments name => "last_day", database => "mydb", # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to drop retention policy: @_"); } ); $cv->recv; Drops specified by "name" retention policy on database "database". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. User Management create_user $cv = AE::cv; $db->create_user( # raw query q => "CREATE USER jdoe WITH PASSWORD 'mypassword' WITH ALL PRIVILEGES", # or query created from arguments username => 'jdoe', password => 'mypassword', all_privileges => 1, # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to create user: @_"); } ); $cv->recv; Creates user with "username" and "password". If flag "all_privileges" is set to true created user will be granted cluster administration privileges. Note: "password" will be automatically enclosed in single quotes. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. set_user_password $cv = AE::cv; $db->set_user_password( # raw query q => "SET PASSWORD FOR jdoe = 'otherpassword'", # or query created from arguments username => 'jdoe', password => 'otherpassword', # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to set password: @_"); } ); $cv->recv; Sets password to "password" for the user identified by "username". Note: "password" will be automatically enclosed in single quotes. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. show_users $cv = AE::cv; $db->show_users( on_success => $cv, on_error => sub { $cv->croak("Failed to list users: @_"); } ); my @users = $cv->recv; for my $u ( @users ) { print "Name: $u->{user}\n"; print "Admin?: $u->{admin}\n"; } Returns a list of hash references with keys "user" and "admin" for each defined user. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. grant_privileges $cv = AE::cv; $db->grant_privileges( # raw query q => "GRANT ALL ON mydb TO jdoe", # or query created from arguments username => 'jdoe', # privileges at single database database => 'mydb', access => 'ALL', # or to grant cluster administration privileges all_privileges => 1, # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to grant privileges: @_"); } ); $cv->recv; Grants to user "username" access "access" on database "database". If flag "all_privileges" is set it grants cluster administration privileges instead. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. show_grants $cv = AE::cv; $db->show_grants( # raw query q => "SHOW GRANTS FOR jdoe", # or query created from arguments username => 'jdoe', # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to list users: @_"); } ); my @grants = $cv->recv; for my $g ( @grants ) { print "Database: $g->{database}\n"; print "Privilege: $g->{privilege}\n"; } Returns a list of hash references with keys "database" and "privilege" describing the privileges granted for database to given user. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. revoke_privileges $cv = AE::cv; $db->revoke_privileges( # raw query q => "REVOKE WRITE ON mydb FROM jdoe", # or query created from arguments username => 'jdoe', # privileges at single database database => 'mydb', access => 'WRITE', # or to revoke cluster administration privileges all_privileges => 1, # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to revoke privileges: @_"); } ); $cv->recv; Revokes from user "username" access "access" on database "database". If flag "all_privileges" is set it revokes cluster administration privileges instead. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. drop_user $cv = AE::cv; $db->drop_user( # raw query q => "DROP USER jdoe", # or query created from arguments username => 'jdoe', # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to drop user: @_"); } ); $cv->recv; Drops user "username". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. Schema Exploration show_measurements $cv = AE::cv; $db->show_measurements( database => 'mydb', # raw query q => "SHOW MEASUREMENTS WITH MEASUREMENT =~ /cpu_load.*/" ." WHERE host = 'server02'" ." ORDER BY region" ." LIMIT 10 OFFSET 3", # or query created from arguments measurement => '/cpu_load.*/', where => q{host = 'server02'}, order_by => 'region', limit => 10, offset => 3, # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to list measurements: @_"); } ); my @measurements = $cv->recv; print "$_\n" for @measurements; Returns names of measurements from database "database", optionally filtering the matching the measurements with regular expression "measurement" and filtered by optional "where" clause. If the "measurement" is not enclosed in "//" then it will be treated as name of the measurement. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. drop_measurement $cv = AE::cv; $db->drop_measurement( database => 'mydb', # raw query q => "DROP MEASUREMENT cpu_load", # or query created from arguments measurement => 'cpu_load', # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to drop measurement: @_"); } ); $cv->recv; Drops measurement "measurement". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. show_series $cv = AE::cv; $db->show_series( database => 'mydb', # raw query q => "SHOW SERIES FROM cpu_load" ." WHERE host = 'server02'" ." ORDER BY region" ." LIMIT 10 OFFSET 3", # or query created from arguments measurement => 'cpu_load', where => q{host = 'server02'}, order_by => 'region', limit => 10, offset => 3, # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to list series: @_"); } ); my $series = $cv->recv; for my $measurement ( sort keys %{ $series } ) { print "Measurement: $measurement\n"; for my $s ( @{ $series->{$measurement} } ) { print " * $_: $s->{$_}\n" for sort keys %{ $s }; } } Returns from database "database" and optional measurement "measurement", optionally filtered by the "where" clause, a hash reference with measurements as keys and their unique tag sets as values. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. drop_series $cv = AE::cv; $db->drop_series( database => 'mydb', # raw query q => "DROP SERIES FROM cpu_load WHERE host = 'server02'", # or query created from arguments measurement => 'cpu_load', where => q{host = 'server02'}, # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to drop measurement: @_"); } ); $cv->recv; Drops series from measurement "measurement" and/or filtered by "where" clause from database "database". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. show_tag_keys $cv = AE::cv; $db->show_tag_keys( database => 'mydb', # raw query q => "SHOW TAG KEYS FROM cpu_load WHERE host = 'server02' LIMIT 10 OFFSET 3", # or query created from arguments measurement => 'cpu_load', where => q{host = 'server02'}, limit => 10, offset => 3, # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to list tag keys: @_"); } ); my $tag_keys = $cv->recv; for my $measurement ( sort keys %{ $tag_keys } ) { print "Measurement: $measurement\n"; print " * $_\n" for @{ $tag_keys->{$measurement} }; } Returns a hash reference with measurements as keys and their unique tag keys as values from database "database" and optional measurement "measurement", optionally filtered by the "where" clause, grouped by "group_by" and number of results limited to "limit" with offset "offset". To limit number of returned series use "slimit" with offset "soffset". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. show_tag_values $cv = AE::cv; $db->show_tag_values( database => 'mydb', # raw query q => "SHOW TAG VALUES FROM cpu_load WITH KEY = 'host' WHERE host = 'server02'", # or query created from arguments measurement => 'cpu_load', # single key key => 'host', # or a list of keys keys => [qw( host region )], where => q{host = 'server02'}, # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to list tag values: @_"); } ); my $tag_values = $cv->recv; for my $tag_key ( sort keys %{ $tag_values } ) { print "Tag key: $tag_key\n"; print " * $_\n" for @{ $tag_values->{$tag_key} }; } Returns from database "database" and optional measurement "measurement", values from a single tag key "key" or a list of tag keys "keys", optionally filtered by the "where" clause, a hash reference with tag keys as keys and their unique tag values as values. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. show_field_keys $cv = AE::cv; $db->show_field_keys( database => 'mydb', # raw query q => "SHOW FIELD KEYS FROM cpu_load", # or query created from arguments measurement => 'cpu_load', # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to list field keys: @_"); } ); my $field_keys = $cv->recv; for my $measurement ( sort keys %{ $field_keys } ) { print "Measurement: $measurement\n"; print " * $_\n" for @{ $field_keys->{$measurement} }; } Returns field keys from all measurements from database "database". Single measurement can be specified with optional "measurement" parameter. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. Continuous Queries create_continuous_query $cv = AE::cv; $db->create_continuous_query( # raw query q => 'CREATE CONTINUOUS QUERY per5minutes ON mydb' .' RESAMPLE EVERY 10s FOR 10m' .' BEGIN' .' SELECT MEAN(value) INTO "cpu_load_per5m" FROM cpu_load GROUP BY time(5m)' .' END', # or query created from arguments database => 'mydb', name => 'per5minutes', every => '10s', for => '2m', query => 'SELECT MEAN(value) INTO "cpu_load_per5m" FROM cpu_load GROUP BY time(5m)', # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to create continuous query: @_"); } ); $cv->recv; Creates new continuous query named by "name" on database "database" using query "query". Optional "every" and "for" define the resampling times. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. drop_continuous_query $cv = AE::cv; $db->drop_continuous_query( # raw query q => 'DROP CONTINUOUS QUERY per5minutes ON mydb', # or query created from arguments database => 'mydb', name => 'per5minutes', # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to drop continuous query: @_"); } ); $cv->recv; Drops continuous query named by "name" on database "database". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. show_continuous_queries $cv = AE::cv; $db->show_continuous_queries( database => 'mydb', on_success => $cv, on_error => sub { $cv->croak("Failed to list continuous queries: @_"); } ); my $continuous_queries = $cv->recv; for my $database ( sort keys %{ $continuous_queries } ) { print "Database: $database\n"; for my $s ( @{ $continuous_queries->{$database} } ) { print " Name: $s->{name}\n"; print " Query: $s->{query}\n"; } } Returns a list of hash references with keys "name" and "query" for each continuous query defined on database "database". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. Managing Data write $cv = AE::cv; $db->write( database => 'mydb', precision => 's', rp => 'last_day', consistency => 'quorum', data => [ # line protocol formatted 'cpu_load,host=server02,region=eu-east sensor="top",value=0.64 1456097956', # or as a hash { measurement => 'cpu_load', tags => { host => 'server02', region => 'eu-east', }, fields => { value => '0.64', sensor => q{"top"}, }, time => time() } ], on_success => $cv, on_error => sub { $cv->croak("Failed to write data: @_"); } ); $cv->recv; Writes time-series data "data" to database "database" with optional parameters: retention policy "rp", time precision "precision" and consistency "consistency". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. The "data" can be specified as single scalar value or hash reference with required keys "measurement" and "fields" and optional "tags" and "time". Both can be also mixed and matched within an array reference. Scalar values are expected to be formatted using InfluxDB line protocol. All special characters need to be escaped. In that case you might want to use InfluxDB::LineProtocol: use InfluxDB::LineProtocol qw(dataline); ... $db->write( database => 'mydb', precision => 'n', data => [ dataline('CPU Load', 0.64, { "Region of the World" => "Eastern Europe", codename => "eu-east" }, 1437868012260500137) # which translates to 'CPU\ Load,Region\ of\ the\ World=Eastern\ Europe,codename=eu-east value=0.64 1437868012260500137', ], ... ); Querying Data select $cv = AE::cv; $db->select( database => 'mydb', # return time in Unix epoch format epoch => "s", # raw query q => "SELECT count(value) FROM cpu_load" ." WHERE region = 'eu-east' AND time > now() - 14d" ." GROUP BY time(1d) fill(none)" ." ORDER BY time DESC" ." LIMIT 10 OFFSET 3", # or query created from arguments fields => 'count(value)', measurement => 'cpu_load', where => "region = 'eu-east' AND time > now() - 14d", group_by => 'time(1d)', fill => 'none', order_by => 'time DESC', limit => 10, offset => 3, # downsample result to another database, retention policy and measurement into => 'otherdb."default".cpu_load_per5m', # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to select data: @_"); } ); my $results = $cv->recv; for my $row ( @{ $results } ) { print "Measurement: $row->{name}\n"; print "Values:\n"; for my $value ( @{ $row->{values} || [] } ) { print " * $_ = $value->{$_}\n" for keys %{ $value || {} }; } } Executes an select query on database "database" created from provided arguments measurement "measurement", fields to select "fields", optional "where" clause, grouped by "group_by" and empty values filled with "fill", ordered by "order_by" and number of results limited to "limit" with offset "offset". To limit number of returned series use "slimit" with offset "soffset". If "into" parameter is provided the result of the query will be copied to specified measurement. If "epoch" is provided the returned "time" value will in Unix epoch format. Optional "chunk_size" can be provided to override the default value of 10,000 datapoints. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. query $cv = AE::cv; $db->query( query => { db => 'mydb', q => 'SELECT * FROM cpu_load', }, on_response => $cv, ); my ($response_data, $response_headers) = $cv->recv; Executes an arbitrary query using provided in "query" arguments. The required "on_response" code reference is executed with the raw response data and headers as parameters. Kapacitor integration Subscriptions tell InfluxDB to send all the data it receives to Kapacitor. create_subscription $cv = AE::cv; $db->create_subscription( # raw query q => 'CREATE SUBSCRIPTION "alldata" ON "mydb"."default"' ." DESTINATIONS ANY 'udp://h1.example.com:9090', 'udp://h2.example.com:9090'", # or query created from arguments name => q{"alldata"}, database => q{"mydb"}, rp => q{"default"}, mode => "ANY", destinations => [ q{'udp://h1.example.com:9090'}, q{'udp://h2.example.com:9090'} ], # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to create subscription: @_"); } ); $cv->recv; Creates a new subscription "name" on database "database" with retention policy "rp" with mode "mode" to destinations provided as "destinations". The "destinations" could be either a single scalar value or array reference to a list of host. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. show_subscriptions $cv = AE::cv; $db->show_subscriptions( on_success => $cv, on_error => sub { $cv->croak("Failed to list shards: @_"); } ); my $subscriptions = $cv->recv; for my $database ( sort keys %{ $subscriptions } ) { print "Database: $database\n"; for my $s ( @{ $subscriptions->{$database} } ) { print " Name: $s->{name}\n"; print " Retention Policy: $s->{retention_policy}\n"; print " Mode: $s->{mode}\n"; print " Destinations:\n"; print " * $_\n" for @{ $s->{destinations} || [] }; } } Returns a hash reference with database name as keys and their shards as values. The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. drop_subscription $cv = AE::cv; $db->drop_subscription( # raw query q => 'DROP SUBSCRIPTION "alldata" ON "mydb"."default"', # or query created from arguments name => q{"alldata"}, database => q{"mydb"}, rp => q{"default"}, # callbacks on_success => $cv, on_error => sub { $cv->croak("Failed to drop subscription: @_"); } ); $cv->recv; Drops subscription "name" on database "database" with retention policy "rp". The required "on_success" code reference is executed if request was successful, otherwise executes the required "on_error" code reference. CAVEATS Following the optimistic nature of InfluxDB this modules does not validate any arguments. Also quoting and escaping special characters is to be done by the user of this library. AUTHOR Alex J. G. Burzyński <ajgb@cpan.org> COPYRIGHT AND LICENSE This software is copyright (c) 2015 by Alex J. G. Burzyński <ajgb@cpan.org>. This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.