InfluxDB
Created: 2023/06/15
概要
InfluxDBを試しつつ情報を蓄積します。
#とは
InfluxDB Times Series Data Platform | InfluxData
InfluxDB. It's About Time. Manage all types of time series data in a single, purpose-built database. Run at any scale in any environment in the cloud, on-premises, or at the edge.
オープンソースの時系列データベースです。センシングデータなどの時間と紐づいたデータを扱えます。
動かす
Dockerで簡単に動きます。
sudo docker run -p 8086:8086 -v myInfluxVolume:/var/lib/influxdb2 -d -t influxdb influxdb:latest
セットアップが終わるとGUIが表示されます。
使う (Influx CLI)
Dockerコンテナにクライアントが入っています。
$ sudo docker exec -it influxdb influx version
Influx CLI dev (git: none) build_date: 2023-04-28T14:24:14Z
bucket
... Bucket management commandsexport
... Export resources as a templatehelp
... List all commandshelp [command]
... Get help with an individual commandquery
... Execute a querywrite
... Write points to InfluxDB
Webクライアントにヘルプがあったので参考にCLIを触ってみます。ここから$ sudo docker exec -it influxdb bash
でコンテナ内のシェルで作業します。
Configure an InfluxDB profile
InfluxDBの接続を構成します。Webクライアントのチュートリアルがサンプルをくれます。
$ influx config create --config-name onboarding \
--host-url "http://[InfluxDB_Host]:8086" \
--org "[ORG]" \
--token "[Token]" \
--active
Active Name URL Org
* onboarding http://[InfluxDB_Host]:8086 [ORG]
--org "[ORG]"
はIDではなくNameの方を使いましょう。でないとError: failed to lookup org with name "[org_id]": 404 Not Found: organization name "[org_id]" not found
と怒られます。
Select or Create a bucket
Bucketを作ります。
$ influx bucket create --name sample-bucket -c onboarding
Write Data
Bucketにデータを書き込みます。サンプルのCSVをAmazon S3に用意してくれている(優しい!)ので使います。
$ influx write --bucket sample-bucket --url https://influx-testdata.s3.amazonaws.com/air-sensor-data-annotated.csv
Execute a Flux Query
データベースなのでクエリが存在します。以前はInflux QueryというSQLライクなクエリ言語だったそうですが、Fluxが新しいバージョン(2.x ~)では主流のようです。
サンプルスクリプトは下記のとおりです。なんだか見慣れない形ですが、データを処理することに特化しているためこの様になっているみたいです。
from(bucket: “weather-data”)
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == “temperature”)
で、CLIで実行するときは次のようなコマンドになります。ワッと結果が出力されます。
$ influx query "from(bucket:\"sample-bucket\") |> range(start:-30m)"
Result: _result
Table: keys: [_start, _stop, _field, _measurement, sensor_id]
_start:time _stop:time _field:string _measurement:string sensor_id:string _time:time _value:float
------------------------------ ------------------------------ ---------------------- ---------------------- ---------------------- ------------------------------ ----------------------------
2023-06-15T14:20:55.732612670Z 2023-06-15T14:50:55.732612670Z co airSensors TLM0100 2023-06-15T14:20:56.000000000Z 0.6540886812204695
2023-06-15T14:20:55.732612670Z 2023-06-15T14:50:55.732612670Z co airSensors TLM0100 2023-06-15T14:21:06.000000000Z 0.6655087367528416
...
集計もできるみたいです。便利~!
$ influx query "from(bucket:\"sample-bucket\") |> range(start:-30m) |> mean()"
Result: _result
Table: keys: [_start, _stop, _field, _measurement, sensor_id]
_start:time _stop:time _field:string _measurement:string sensor_id:string _value:float
------------------------------ ------------------------------ ---------------------- ---------------------- ---------------------- ----------------------------
2023-06-15T14:22:28.665743801Z 2023-06-15T14:52:28.665743801Z co airSensors TLM0100 0.6622839258689658
Table: keys: [_start, _stop, _field, _measurement, sensor_id]
_start:time _stop:time _field:string _measurement:string sensor_id:string _value:float
------------------------------ ------------------------------ ---------------------- ---------------------- ---------------------- ----------------------------
2023-06-15T14:22:28.665743801Z 2023-06-15T14:52:28.665743801Z humidity airSensors TLM0100 35.78526123104399
...
使う (Python)
influxdb-client
という公式ライブラリがあります。自分の環境にインストールしておきましょう。
pip install influxdb-client
接続
WebUIからトークンを取得します。
チュートリアルでは環境変数にトークンを設定し、そこから取るようです。同じようにやりましょう。
export INFLUXDB_TOKEN=****
次のコードでは、環境変数から取ってきたトークン、org(要調査)、URLを利用してクライアントをインスタンス化します。
import influxdb_client, os, time
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
token = os.environ.get("INFLUXDB_TOKEN")
org = "kokshilab"
url = "http://[HOST]:[PORT]"
write_client = influxdb_client.InfluxDBClient(url=url, token=token, org=org)
Write API
InfluxDBにデータを書き込むときはInfluxDBClient.write_api
を使います。チュートリアルそのままだと実行できないので変えます。
bucket="sample-bucket"
# write_api = client.write_api(write_options=SYNCHRONOUS)
write_api = write_client.write_api(write_options=SYNCHRONOUS)
for value in range(5):
point = (
Point("measurement1")
.tag("tagname1", "tagvalue1")
.field("field1", value)
)
write_api.write(bucket=bucket, org="kokshilab", record=point)
time.sleep(1) # separate points by 1 second
Query API
データを取るときはInfluxDBClient.query_api
を利用します。Fluxを利用していろいろすることが出来ます。
# query_api = client.query_api()
query_api = write_client.query_api()
query = """from(bucket: "sample-bucket")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "measurement1")"""
tables = query_api.query(query, org="kokshilab")
for table in tables:
for record in table.records:
print(record)