JavaScriptを有効にしてください

PythonとPandasでInfluxDBを操作する

 ·   6 min read

はじめに

PythonとPandasを使って、時系列データベースInfluxDBを操作する方法についてまとめた。

InfluxDBは時系列データの扱いに特化したデータベースである。概要は以下の記事を参照。
時系列データベースInfluxDB入門

PythonでInfluxDBを操作するには、influx-pythonというライブラリを使う。このライブラリは、InfluxDBを開発したinfluxdata社が公開しているものである。influx-pythonを使うと、PandasのDataFrameでInfluxDBとデータを授受できる。

環境は以下の通り。

  • Linux Mint 19.3 (Cinnamon)
  • InfluxDB 1.8.0

また、Pythonのバージョンは以下の通り。

バージョン
Python 3.7.6
NumPy 1.18.1
Pandas 1.0.1
Influxdb-python 5.2.2

ライブラリのインストール

PythonからInfluxDBを操作するライブラリinflux-pythonをインストールする。

condaの場合:

conda install -c pdrops influxdb

pipの場合:

pip install influxdb

以降では、ライブラリを以下のようにインポートすることを前提とする。

1
2
3
import numpy as np
import pandas as pd
import influxdb

influx-pythonの基本

influx-pythonでInfluxDBを操作するために、次の2つのクラスが用意されている。

  • InfluxDBClient: JSONを使う
  • DataFrameClient: PandasのDataFrameを使う

本記事では、DataFrameClientを扱う。

DataFrameClientクラス

DataFrameClientクラスの主な引数は以下の通り。

1
2
influxdb.DataFrameClient(host=u'localhost', port=8086, username=u'root',
                         password=u'root', database=None)

hostはアクセス先のInfluxDBがあるPCのIPアドレスである。同じPCの場合はデフォルト値のlocalhostでよい。

portはInfluxDBのポート番号である。8086はInfluxDBのデフォルトのポート番号である。

username, passwordはInfluxDBにアクセスするためのユーザ名、パスワードである。‘root’はInfluxDBのデフォルト値と同じである。

databaseは接続先のデータベース名である。後ほどメソッド実行時に指定できるので、DataFrameClientオブジェクト作成時に指定する必要はない。

DataFrameClientクラスの主なメソッドは次の通り。

メソッド名 説明
create_database(<データベース名>) データベースを作成する
get_list_database() データベース一覧を取得する
get_list_measurements() measurement一覧を取得する
drop_database(<データベース名>) データベースを削除する
drop_measurement(<measurement名>) measurementを削除する
write_points() DataFrameを書き込む
query() データを取得する

write_points()query()については、以降で詳しく述べる。

InfluxDBへの接続

DataFrameClientクラスを使って、InfluxDBへ接続する。まず、DataFrameClientオブジェクトを作成し、pd_testという名前のデータベースを作成する。

1
2
client = influxdb.DataFrameClient()
client.create_database("pd_test")

データベースを既に作成している場合は、以下のようにしても良い。

1
client = influxdb.DataFrameClient(database="pd_test")

DataFrameをInfluxDBに書き込む

PandasのDataFrameをInfluxDBに書き込むには、write_pointsメソッドを用いる。

1
DataFrameClient.write_points(dataframe, measurement,database=None)

引数の説明は次の通り。

  • dataframe: PandasのDataFrame
  • measurement: データを追加するmeasurement(表の名前)
  • database: データベース名。

ここで、DataFrameClientオブジェクトにデータベースが設定されていない場合、databaseは必須である。databaseを指定しないと、write_pointsメソッド実行時に以下のエラーが出る。

1
nfluxDBClientError: 400: {"error":"database is required"}

一方、DataFrameClientオブジェクトにデータベースが設定されており、かつwrite_pointsメソッドで指定しない場合、オブジェクトに設定されたデータベースにデータが追加される。

例として、PandasのDataFrameを、先ほど作成したpd_testデータベースに追加する。DataFrameは、10行×2列の大きさで、2020年4月1日から1日周期でインデックスを振っている。ここで、measurement名はmeas1とした。

1
2
3
4
5
array = np.arange(20).reshape(-1, 2)
index = pd.date_range(pd.Timestamp("20200401"), freq="1D", periods=10)
df = pd.DataFrame(array, index=index, columns=["A", "B"])  

client.write_points(df, "meas1", database="pd_test")

DataFrameをInfluxDBから取得する

InfluxDBからデータを取得するには、queryメソッドを用いる。

1
DataFrameClient.query(query, database=None)

引数の説明は以下の通り。

  • query: InfluxDBのクエリ文。
  • database: データベース名。

クエリ文の構文の詳細については、別記事で説明予定。

ここで、DataFrameClientオブジェクトにデータベースが設定されていない場合、databaseは必須である。一方、DataFrameClientオブジェクトにデータベースが設定されており、かつwrite_pointsメソッドで指定しない場合、オブジェクトに設定されたデータベースにデータが追加される。

また、queryメソッドの戻り値は辞書型(正確にはdefaultdict)であり、キーがmeasurement, 値がデータフレームになる。

例として、先程write_pointsメソッドで書き込んだデータを取得する。

1
2
3
q1 = "SELECT * FROM meas1"
res = client.query(q1, database="pd_test")
print(res)

ここで、クエリ文

SELECT * FROM meas1

は、measurement meas1の全てのfieldを取得することを表す。

実行結果
queryメソッドの戻り値resは辞書型であり、キーがmeasurement meas1, 値がDataFrameとなる。また、DataFrameのindexDateTimeIndex, columnsはInfluxDBのfieldとなる。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
defaultdict(<class 'list'>, {'meas1':                             A   B
2020-04-01 00:00:00+00:00   0   1
2020-04-02 00:00:00+00:00   2   3
2020-04-03 00:00:00+00:00   4   5
2020-04-04 00:00:00+00:00   6   7
2020-04-05 00:00:00+00:00   8   9
2020-04-06 00:00:00+00:00  10  11
2020-04-07 00:00:00+00:00  12  13
2020-04-08 00:00:00+00:00  14  15
2020-04-09 00:00:00+00:00  16  17
2020-04-10 00:00:00+00:00  18  19})

なお、DataFrameを直接取得する場合には、

df = list(res.values())[0]

などのようにする。

参考

Welcome to InfluxDB’s documentation! — InfluxDB 5.3.1 documentation
時系列データベースInfluxDB入門

シェアする

Helve
WRITTEN BY
Helve
関西在住、電機メーカ勤務のエンジニア。X(旧Twitter)で新着記事を配信中です

サイト内検索