はじめに #
PythonとPandasを使って、時系列データベースInfluxDBを操作する方法についてまとめた。
InfluxDBは時系列データの扱いに特化したデータベースである。概要は以下の記事を参照。 時系列データベースInfluxDB入門
PythonでInfluxDBを操作するには、influx-pythonというライブラリを使う。このライブラリは、InfluxDBを開発したinfluxdata社が公開しているものである。influx-pythonを使うと、PandasのDataFrameでInfluxDBとデータを授受できる。
環境は以下の通り。
- Linux Mint 19.3 (Cinnamon)
- InfluxDB 1.8.0
また、Pythonのバージョンは以下の通り。
| バージョン | |
|---|---|
| Python | 3.7.6 |
| NumPy | 1.18.1 |
| Pandas | 1.0.1 |
| Influxdb-python | 5.2.2 |
ライブラリのインストール #
PythonからInfluxDBを操作するライブラリinflux-pythonをインストールする。
condaの場合:
conda install -c pdrops influxdb
pipの場合:
pip install influxdb
以降では、ライブラリを以下のようにインポートすることを前提とする。
import numpy as np
import pandas as pd
import influxdb
influx-pythonの基本 #
influx-pythonでInfluxDBを操作するために、次の2つのクラスが用意されている。
InfluxDBClient: JSONを使うDataFrameClient: PandasのDataFrameを使う
本記事では、DataFrameClientを扱う。
DataFrameClientクラス #
DataFrameClientクラスの主な引数は以下の通り。
influxdb.DataFrameClient(host=u'localhost', port=8086, username=u'root',
password=u'root', database=None)
hostはアクセス先のInfluxDBがあるPCのIPアドレスである。同じPCの場合はデフォルト値のlocalhostでよい。
portはInfluxDBのポート番号である。8086はInfluxDBのデフォルトのポート番号である。
username, passwordはInfluxDBにアクセスするためのユーザ名、パスワードである。‘root’はInfluxDBのデフォルト値と同じである。
databaseは接続先のデータベース名である。後ほどメソッド実行時に指定できるので、DataFrameClientオブジェクト作成時に指定する必要はない。
DataFrameClientクラスの主なメソッドは次の通り。
| メソッド名 | 説明 |
|---|---|
create_database(<データベース名>) |
データベースを作成する |
get_list_database() |
データベース一覧を取得する |
get_list_measurements() |
measurement一覧を取得する |
drop_database(<データベース名>) |
データベースを削除する |
drop_measurement(<measurement名>) |
measurementを削除する |
write_points() |
DataFrameを書き込む |
query() |
データを取得する |
write_points()とquery()については、以降で詳しく述べる。
InfluxDBへの接続 #
DataFrameClientクラスを使って、InfluxDBへ接続する。まず、DataFrameClientオブジェクトを作成し、pd_testという名前のデータベースを作成する。
client = influxdb.DataFrameClient()
client.create_database("pd_test")
データベースを既に作成している場合は、以下のようにしても良い。
client = influxdb.DataFrameClient(database="pd_test")
DataFrameをInfluxDBに書き込む #
PandasのDataFrameをInfluxDBに書き込むには、write_pointsメソッドを用いる。
DataFrameClient.write_points(dataframe, measurement,database=None)
引数の説明は次の通り。
dataframe: PandasのDataFramemeasurement: データを追加するmeasurement(表の名前)database: データベース名。
ここで、DataFrameClientオブジェクトにデータベースが設定されていない場合、databaseは必須である。databaseを指定しないと、write_pointsメソッド実行時に以下のエラーが出る。
nfluxDBClientError: 400: {"error":"database is required"}
一方、DataFrameClientオブジェクトにデータベースが設定されており、かつwrite_pointsメソッドで指定しない場合、オブジェクトに設定されたデータベースにデータが追加される。
例として、PandasのDataFrameを、先ほど作成したpd_testデータベースに追加する。DataFrameは、10行×2列の大きさで、2020年4月1日から1日周期でインデックスを振っている。ここで、measurement名はmeas1とした。
array = np.arange(20).reshape(-1, 2)
index = pd.date_range(pd.Timestamp("20200401"), freq="1D", periods=10)
df = pd.DataFrame(array, index=index, columns=["A", "B"])
client.write_points(df, "meas1", database="pd_test")
DataFrameをInfluxDBから取得する #
InfluxDBからデータを取得するには、queryメソッドを用いる。
DataFrameClient.query(query, database=None)
引数の説明は以下の通り。
query: InfluxDBのクエリ文。database: データベース名。
クエリ文の構文の詳細については、別記事で説明予定。
ここで、DataFrameClientオブジェクトにデータベースが設定されていない場合、databaseは必須である。一方、DataFrameClientオブジェクトにデータベースが設定されており、かつwrite_pointsメソッドで指定しない場合、オブジェクトに設定されたデータベースにデータが追加される。
また、queryメソッドの戻り値は辞書型(正確にはdefaultdict)であり、キーがmeasurement, 値がデータフレームになる。
例として、先程write_pointsメソッドで書き込んだデータを取得する。
q1 = "SELECT * FROM meas1"
res = client.query(q1, database="pd_test")
print(res)
ここで、クエリ文
SELECT * FROM meas1
は、measurement meas1の全てのfieldを取得することを表す。
実行結果:queryメソッドの戻り値resは辞書型であり、キーがmeasurement meas1, 値がDataFrameとなる。また、DataFrameのindexはDateTimeIndex, columnsはInfluxDBのfieldとなる。
defaultdict(<class 'list'>, {'meas1': A B
2020-04-01 00:00:00+00:00 0 1
2020-04-02 00:00:00+00:00 2 3
2020-04-03 00:00:00+00:00 4 5
2020-04-04 00:00:00+00:00 6 7
2020-04-05 00:00:00+00:00 8 9
2020-04-06 00:00:00+00:00 10 11
2020-04-07 00:00:00+00:00 12 13
2020-04-08 00:00:00+00:00 14 15
2020-04-09 00:00:00+00:00 16 17
2020-04-10 00:00:00+00:00 18 19})
なお、DataFrameを直接取得する場合には、
df = list(res.values())[0]
などのようにする。