概要
ざっくりいうと、FXや資金などの金融取引を電子的にやり取りするときにFIXプロトコルという専用の方式が使われています。
デリバティブ関連のシステムを扱っている会社なんかでは聞いたことがある人もいるかもしれません。
そのプロトコルを実装したオープンソースなライブラリとしてquickfixというのがあります。
quickfixはC++、Java、.NET (C#)、Pythonに対応しているようです。
今回はJavaにあたるQuickFIX/J(quickfixjとも書く)についてまとめていきます。
注意
今回書くのはQuickFIX/Jの中でも結構レガシーなバージョンについて扱いますので
最新バージョンを扱う予定のある方は最新バージョンではどうなっているのかを気にしながら
本記事は参考程度にご参照ください
バージョン
QuickFIX/Jでバージョンというと2つの意味があります。
①QuickFIX/Jのライブラリのバージョン
- Javaのバージョン次第で動作するライブラリのバージョンが異なります。
- 2.0.0はJava8以降じゃないと動かないなどの制約があります。
- quickfixjのGitHubのReleasesにまとまっています。
②QuickFIX/Jで扱うメッセージのバージョン
- QuickFIX/Jのライブラリのバージョンが決まったところで、どのメッセージを使うのかを決めます。
- 多くの場合は接続先の会社などと決めるものかと思います。
- バージョンごとのメッセージ情報はFIX Dictionaryのページにまとまっています。
- https://www.onixs.biz/fix-dictionary.html
- ここでいうDictionaryは決まりとかルールみたいな意味で、フォーマットとかバリデーションとかの役割が含まれます。
今回紹介するのは以下のバージョンです
①1.6.4
②FIX50SP1, FIXT1.1(FIX50SP1から、アプリレベルはFIX50SP1, アドミンレベルはFIXT1.1に分かれました)
ダウンロード
quickfixjのGitHubのReleasesから対象のバージョンのzipファイルをダウンロードします。
https://github.com/quickfix-j/quickfixj/releases
解凍すると何やらたくさんjarファイルがありますが、重要なjarファイルは以下です
- quickfixj-all-1.6.4.jar(非推奨)
- ②で触れたメッセージのバージョン一式が入ったjarです。
- 正直これを取り込むのではなくバージョン指定で取り込むのをおすすめします。
- 使いたいメッセージのバージョンではなく別のバージョンを使ってしまっている、なんてことになりかねません。
- quickfixj-messages-XXXXX-1.6.4.jar
- quickfixj-all-1.6.4.jarではなくバージョン指定したjarファイルです
- quickfixj-core-1.6.4.jar
- quickfixjの核となるjarです
- slf4j-api-1.7.22.jar(オプション)
- ログ出力に関するjarです。
- 後述しますが、quickfixjのログ出力はお使いのアプリのログ出力に統合できるため、すでにログ出力の仕組みが構築されている場合は不要です。
- slf4j-jdk14-1.7.22.jar(オプション)
- slf4j-api-1.7.22.jarと同様です
設定次第ですが、最小限動かす分には上記のjarがあれば十分です。
むしろむやみに不要なライブラリを取り込むとよからぬところで影響が出てしまいます。
例えば、既存のプロジェクトでslf4jを使っているのにquickfixjに同封されたslf4jを取り込むとログ出力が期待通りに動かないなんてことが普通に起こります。
また、Saxon-HE-9.8.0-2.jarなんかは署名付きなので、お使いのアプリの署名設定次第ではアプリの署名が壊れます。
ライブラリを使うときは最小限にしましょう。
解凍したjarファイルたちをIDEに取り込むなり、antやらmavenやらgradleやらのビルドツールに取り込むことで使えるようになります。
AcceptorとInitiator
基本的に、ログオンする方がInitiator、ログオンを受け付ける方がAcceptorです。
クライアント・サーバ型に当てはめてクライアント側がInitiator、サーバ側がAcceptorなんて言われたりもします。
Acceptor
acceptor.cfg
[DEFAULT]
ConnectionType=acceptor
StartTime=00:00:00
EndTime=00:00:00
HeartBtInt=30
SenderCompID=ACCEPTOR
TargetCompID=INITIATOR
[SESSION]
BeginString=FIXT.1.1
DefaultApplVerID=FIX.5.0SP1
SocketAcceptPort=9876
AcceptorApp.java
import quickfix.*;
public class AcceptorApp extends MessageCracker implements Application {
@Override
public void onCreate(SessionID sessionId) {}
@Override
public void onLogon(SessionID sessionId) {
System.out.println("Acceptor logged on: " + sessionId);
}
@Override
public void onLogout(SessionID sessionId) {
System.out.println("Acceptor logged out: " + sessionId);
}
@Override
public void toAdmin(Message message, SessionID sessionId) {}
@Override
public void fromAdmin(Message message, SessionID sessionId) {}
@Override
public void toApp(Message message, SessionID sessionId) throws DoNotSend {}
@Override
public void fromApp(Message message, SessionID sessionId) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
crack(message, sessionId);
}
public void onMessage(TradeCaptureReport message, SessionID sessionID) {
}
public static void main(String[] args) throws ConfigError {
Application application = new AcceptorApp();
SessionSettings settings = new SessionSettings("acceptor.cfg");
MessageStoreFactory storeFactory = new FileStoreFactory(settings);
LogFactory logFactory = new FileLogFactory(settings);
MessageFactory messageFactory = new DefaultMessageFactory();
Acceptor acceptor = new SocketAcceptor(application, storeFactory, settings, logFactory, messageFactory);
acceptor.start();
}
}
※crackを使うと、onMessageを定義していない型のメッセージが来ると例外がthrowされます。想定外のメッセージが来ることも考えると使い方には注意が必要です。
Initiator
initiator.cfg
[DEFAULT]
ConnectionType=initiator
StartTime=00:00:00
EndTime=00:00:00
HeartBtInt=30
ReconnectInterval=60
SenderCompID=INITIATOR
TargetCompID=ACCEPTOR
[SESSION]
BeginString=FIXT.1.1
DefaultApplVerID=FIX.5.0SP1
SocketConnectHost=localhost
SocketConnectPort=9876
InitiatorApp.java
import quickfix.*;
public class InitiatorApp extends MessageCracker implements Application {
@Override
public void onCreate(SessionID sessionId) {}
@Override
public void onLogon(SessionID sessionId) {
System.out.println("Initiator logged on: " + sessionId);
try {
sendTradeCaptureReport(sessionId);
} catch (SessionNotFound e) {
e.printStackTrace();
}
}
// TradeCaptureReportを送信するメソッド
private void sendTradeCaptureReport(SessionID sessionId) throws SessionNotFound {
TradeCaptureReport tradeCaptureReport = new TradeCaptureReport(
new TradeReportID("12345"), // ユニークID
new TradeReportTransType(TradeReportTransType.NEW) // 新規トレード
);
tradeCaptureReport.set(new Symbol("AAPL")); // 銘柄
tradeCaptureReport.set(new LastQty(100)); // 取引数量
tradeCaptureReport.set(new LastPx(150.25)); // 取引価格
tradeCaptureReport.set(new TradeDate("20230902")); // 取引日
tradeCaptureReport.set(new TransactTime(new java.util.Date())); // トランザクション時間
// メッセージを送信
Session.sendToTarget(tradeCaptureReport, sessionId);
System.out.println("TradeCaptureReport sent.");
}
@Override
public void onLogout(SessionID sessionId) {
System.out.println("Initiator logged out: " + sessionId);
}
@Override
public void toAdmin(Message message, SessionID sessionId) {}
@Override
public void fromAdmin(Message message, SessionID sessionId) {}
@Override
public void toApp(Message message, SessionID sessionId) throws DoNotSend {}
@Override
public void fromApp(Message message, SessionID sessionId) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
crack(message, sessionId);
}
public static void main(String[] args) throws ConfigError {
Application application = new InitiatorApp();
SessionSettings settings = new SessionSettings("initiator.cfg");
MessageStoreFactory storeFactory = new FileStoreFactory(settings);
LogFactory logFactory = new FileLogFactory(settings);
final DefaultMessageFactory defaultMessageFactory = new DefaultMessageFactory();
defaultMessageFactory.addFactory(FixVersions.FIX50, quickfix.fix50sp1.MessageFactory.class);
Initiator initiator = new SocketInitiator(application, storeFactory, settings, logFactory, defaultMessageFactory);
initiator.start();
}
}
※FIX50SP1、FIX50SP2を使う場合、以下の処理が必要になります。
defaultMessageFactory.addFactory(FixVersions.FIX50, quickfix.fix50sp1.MessageFactory.class);
参考
https://www.quickfixj.org/jira/browse/QFJ-888
使用方法
まずAcceptorAppのmainを実行します。次にInitiatorAppのmainを実行すれば接続できます。
あとはInitiator側でTradeCaptureReportをnewしてAcceptor側に送信します。
acceptor.cfgやinitiator.cfgの設定
以下のURLにまとまっています。
https://www.quickfixj.org/usermanual/2.3.0/usage/configuration.html
ログ出力の統合
QuiickFIX/Jではログ出力をご使用のアプリケーションのログ出力に統合することができます。
QuiickFIX/Jの処理をQuiickFIX/Jのログではなくアプリログに出力することが可能です。
QuiickFIX/Jの処理をアプリログに出力する方法
Logインタフェースを実装したクラスの作成
import quickfix.Log;
import quickfix.SessionID;
public class CustomLog implements Log {
private final Logger logger;
public CustomLog(SessionID sessionID) {
if (sessionID != null) {
logger = LoggerFactory.getLogger("QuickFIXJ." + sessionID);
} else {
logger = LoggerFactory.getLogger("QuickFIXJ");
}
}
@Override
public void onIncoming(String message) {
logger.info("Incoming: " + message);
}
@Override
public void onOutgoing(String message) {
logger.info("Outgoing: " + message);
}
@Override
public void onEvent(String text) {
logger.info("Event: " + text);
}
@Override
public void onErrorEvent(String text) {
logger.error("Error: " + text);
}
@Override
public void clear() {}
}
LogFactoryインタフェースを実装したクラスの作成
import quickfix.*;
public class CustomLogFactory implements LogFactory {
private final SessionSettings settings;
public CustomLogFactory(SessionSettings settings) {
this.settings = settings;
}
@Override
public Log create(SessionID sessionID) {
return new CustomLog(sessionID);
}
@Override
public Log create() {
return new CustomjLog(null);
}
}
Initiatorをnewするときに作成したCustomLogFactoryクラスをnewして渡す
public static void main(String[] args) throws ConfigError {
Application application = new InitiatorApp();
SessionSettings settings = new SessionSettings("initiator.cfg");
MessageStoreFactory storeFactory = new FileStoreFactory(settings);
LogFactory logFactory = new CustomLogFactory(settings);
final DefaultMessageFactory defaultMessageFactory = new DefaultMessageFactory();
defaultMessageFactory.addFactory(FixVersions.FIX50, quickfix.fix50sp1.MessageFactory.class);
Initiator initiator = new SocketInitiator(application, storeFactory, settings, logFactory, defaultMessageFactory);
initiator.start();
}
ちなみにログはDBに記録させることも可能です。
JdbcStore=Y
JdbcDriver=com.mysql.cj.jdbc.Driver
JdbcURL=jdbc:mysql://localhost:3306/fixdb
JdbcUser=fixuser
JdbcPassword=fixpassword
LogFactoryをinitiatorに渡すところをJdbcLogFactoryにすればDBに記録されます。
テーブル名はデフォルトでmessages_log、event_logですが、JdbcLogIncomingTable(JdbcLogOutgoingTable)、JdbcLogEventTableの設定で変更可能です。
ディクショナリによるバリデーション
ディクショナリによるバリデーションの例
必須バリデーションに抵触した場合は以下のようなメッセージが出ます。
Rejecting message: Missing required field [Tag=55 (Symbol)] in message type D (NewOrderSingle)
RejectInvalidMessage=Yの場合は処理を中断してRejectメッセージをsendします。Nの場合は警告のみがログに出て後続の処理が走ります。
デフォルトではディクショナリによるバリデーションが効いているが、それを無効にして手動でバリデーションをかけることも可能ですが、自動バリデーションと手動バリデーションでは全く同じバリデーションにすることは不可能なので基本は自動バリデーション推奨です。
注意
ValidateFieldsOutOfOrderがデフォルトでYになっており、メッセージヘッダーやボディの中身の順番がディクショナリと一致しない場合にバリデーションに引っ掛かります。
順番通りに実装してあれば問題ありませんが、そこまで考えて作られているのは滅多にないと思うのでNにすることをおすすめします。
シーケンス番号の管理
quickfixではメッセージの順番をシーケンスとして記録します。
記録方法はファイルとDBで大別されます。
ファイル
FileStorePathに指定したパスに.seqnumsというファイルが作成され、そこにシーケンスが記録されています。
DB
JdbcStore=Y
JdbcDriver=com.mysql.cj.jdbc.Driver
JdbcURL=jdbc:mysql://localhost:3306/fixdb
JdbcUser=fixuser
JdbcPassword=fixpassword
デフォルトではテーブル名はmessagesとsessionsですが、JdbcStoreMessagesTableNameとJdbcStoreSessionsTableNameの設定で変更可能です。
SSLによる接続
証明書(opensslやjdkのkeytool等)
- CSRをopensslやjdk同封のkeytoolで作成
- 接続先にCSRを連携し証明書を作ってもらう
- 証明書をjavaで読み込めるようにkeystoreに入れるか専用のJKSを作成
- quickfixjの設定
SocketUseSSL=Y
SocketKeyStore={キーストアファイルのパス}
SocketKeyStorePassword{キーストアファイルのパスワード}
最後に
QuickFIX/Jはシーケンス管理をやってくれたりログ出力を統合できたり、何かと便利な作りになっています。うまく使えばあなたの役に立つでしょう。
随時更新中
コメント