コンテンツにスキップ

InfluxDB

Created: 2023/06/15

概要

InfluxDBを試しつつ情報を蓄積します。

#とは

InfluxDB Times Series Data Platform | InfluxData

InfluxDB. It's About Time. Manage all types of time series data in a single, purpose-built database. Run at any scale in any environment in the cloud, on-premises, or at the edge.

オープンソースの時系列データベースです。センシングデータなどの時間と紐づいたデータを扱えます。

動かす

Dockerで簡単に動きます。

sudo docker run -p 8086:8086 -v myInfluxVolume:/var/lib/influxdb2 -d -t influxdb influxdb:latest

セットアップが終わるとGUIが表示されます。

alt

使う (Influx CLI)

Dockerコンテナにクライアントが入っています。

$ sudo docker exec -it influxdb influx version
Influx CLI dev (git: none) build_date: 2023-04-28T14:24:14Z
  • bucket ... Bucket management commands
  • export ... Export resources as a template
  • help ... List all commands
  • help [command] ... Get help with an individual command
  • query ... Execute a query
  • write ... Write points to InfluxDB

Webクライアントにヘルプがあったので参考にCLIを触ってみます。ここから$ sudo docker exec -it influxdb bashでコンテナ内のシェルで作業します。

Configure an InfluxDB profile

InfluxDBの接続を構成します。Webクライアントのチュートリアルがサンプルをくれます。

$ influx config create --config-name onboarding \
  --host-url "http://[InfluxDB_Host]:8086" \
  --org "[ORG]" \
  --token "[Token]" \
  --active
Active  Name            URL                             Org
*       onboarding      http://[InfluxDB_Host]:8086      [ORG]

--org "[ORG]"はIDではなくNameの方を使いましょう。でないとError: failed to lookup org with name "[org_id]": 404 Not Found: organization name "[org_id]" not foundと怒られます。

Select or Create a bucket

Bucketを作ります。

alt

$ influx bucket create --name sample-bucket -c onboarding

Write Data

Bucketにデータを書き込みます。サンプルのCSVをAmazon S3に用意してくれている(優しい!)ので使います。

$ influx write --bucket sample-bucket --url https://influx-testdata.s3.amazonaws.com/air-sensor-data-annotated.csv

alt

Execute a Flux Query

データベースなのでクエリが存在します。以前はInflux QueryというSQLライクなクエリ言語だったそうですが、Fluxが新しいバージョン(2.x ~)では主流のようです。

サンプルスクリプトは下記のとおりです。なんだか見慣れない形ですが、データを処理することに特化しているためこの様になっているみたいです。

from(bucket: “weather-data”)
  |> range(start: -10m)
  |> filter(fn: (r) => r._measurement == “temperature”)

で、CLIで実行するときは次のようなコマンドになります。ワッと結果が出力されます。

$ influx query "from(bucket:\"sample-bucket\") |> range(start:-30m)"
Result: _result
Table: keys: [_start, _stop, _field, _measurement, sensor_id]
                   _start:time                      _stop:time           _field:string     _measurement:string        sensor_id:string                      _time:time                  _value:float
------------------------------  ------------------------------  ----------------------  ----------------------  ----------------------  ------------------------------  ----------------------------
2023-06-15T14:20:55.732612670Z  2023-06-15T14:50:55.732612670Z                      co              airSensors       TLM0100  2023-06-15T14:20:56.000000000Z            0.6540886812204695
2023-06-15T14:20:55.732612670Z  2023-06-15T14:50:55.732612670Z                      co              airSensors       TLM0100  2023-06-15T14:21:06.000000000Z            0.6655087367528416
...

集計もできるみたいです。便利~!

$ influx query "from(bucket:\"sample-bucket\") |> range(start:-30m) |> mean()"
Result: _result
Table: keys: [_start, _stop, _field, _measurement, sensor_id]
                   _start:time                      _stop:time           _field:string     _measurement:string        sensor_id:string                  _value:float
------------------------------  ------------------------------  ----------------------  ----------------------  ----------------------  ----------------------------
2023-06-15T14:22:28.665743801Z  2023-06-15T14:52:28.665743801Z                      co              airSensors                 TLM0100            0.6622839258689658
Table: keys: [_start, _stop, _field, _measurement, sensor_id]
                   _start:time                      _stop:time           _field:string     _measurement:string        sensor_id:string                  _value:float
------------------------------  ------------------------------  ----------------------  ----------------------  ----------------------  ----------------------------
2023-06-15T14:22:28.665743801Z  2023-06-15T14:52:28.665743801Z                humidity              airSensors                 TLM0100             35.78526123104399
...

使う (Python)

influxdb-clientという公式ライブラリがあります。自分の環境にインストールしておきましょう。

pip install influxdb-client

接続

WebUIからトークンを取得します。

alt

チュートリアルでは環境変数にトークンを設定し、そこから取るようです。同じようにやりましょう。

export INFLUXDB_TOKEN=****

次のコードでは、環境変数から取ってきたトークン、org(要調査)、URLを利用してクライアントをインスタンス化します。

import influxdb_client, os, time
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

token = os.environ.get("INFLUXDB_TOKEN")
org = "kokshilab"
url = "http://[HOST]:[PORT]"

write_client = influxdb_client.InfluxDBClient(url=url, token=token, org=org)

Write API

InfluxDBにデータを書き込むときはInfluxDBClient.write_apiを使います。チュートリアルそのままだと実行できないので変えます。

bucket="sample-bucket"

# write_api = client.write_api(write_options=SYNCHRONOUS)
write_api = write_client.write_api(write_options=SYNCHRONOUS)

for value in range(5):
  point = (
    Point("measurement1")
    .tag("tagname1", "tagvalue1")
    .field("field1", value)
  )
  write_api.write(bucket=bucket, org="kokshilab", record=point)
  time.sleep(1) # separate points by 1 second

Query API

データを取るときはInfluxDBClient.query_apiを利用します。Fluxを利用していろいろすることが出来ます。

# query_api = client.query_api()
query_api = write_client.query_api()

query = """from(bucket: "sample-bucket")
 |> range(start: -10m)
 |> filter(fn: (r) => r._measurement == "measurement1")"""
tables = query_api.query(query, org="kokshilab")

for table in tables:
  for record in table.records:
    print(record)