HAYABUSAのなかみ(Hayabusa internals)
こんにちは、かずみん (@k47_um1n) | Twitterです。
2021/12/25に、OSS Hayabusa がリリースされたのは、知っていますか? 知らない人向けに簡単に説明すると、”Windowsイベントログのファストフォレンジックタイムライン生成およびスレットハンティングツール”です。 詳しく知りたい人は、日本語REDMEを見てみるとよいです。
また、コアコミッターの いちび (@itiB_S144) | Twitter さんの記事を先に見てみるとよいです。
Windowsイベントログ解析ツール「Hayabusa」を使ってみる - itiblog
この記事では、コミッターである私がHayabusaの中身について、コードベースで説明していきます。 先に、READMEに沿って、ツールを使ってみると、解説が理解しやすいと思います。
v1.0.0の解説です。 Hayabusaは、Rustで書かれています。
※私の解説した処理以外の機能もあるので、ご注意。基本的に、私の興味があるところを解説します。
ディレクトリ構成
ルール処理に関することは、 hayabusa/src/detections/rule
ディレクトリに書かれており、
この中のエントリーポイントは、hayabusa/src/detections/detection.rs
やhayabusa/src/detections/mod.rs
などから呼び出されています。
検知処理
検知処理の私の面白そうな部分としては、ルールで論理条件を指定できるcondition項目があります。 そこでは、count処理とand/or/notを指定できます。 以下では、その2つの処理を見ていきたいと思います。
処理の概要
インタプリタ言語処理みたいに、このようになってます。
tokenize -> parser -> eval
parser部分では、SelectionNode木を作成しています。
eval部分では、この木をselect()で実行しています。
この後も、最後に説明するcount処理が入ったりします。
入口
とりあえず、main関数を見ていく。検知部分までは、駆け足でいきます。 githubで見ながら説明を見るとわかるかも。
hayabusa/main.rs at v1.0.0 · Yamato-Security/hayabusa · GitHub
関数フローは、以下のようになります。
- main() - exec() - analysis_files() - parse_rule_files() - rule.init() - compile_condition() - analysis_file() - detection.start() - add_aggcondition_msges()
exec()の中で、各オプションごとの処理を分岐していきます。 ログファイルが指定されたら、それをanalysis_files()に渡す。 ログディレクトリが指定されたら、それをanalysis_files()に渡す。 両方とも配列にして渡す。
parse_rule_files()は、その名の通り、全部のルールをパースする。パースした時に、オプションに従って、ファイルを除外したりしている。condition項目をパースする処理も含まれ、compile_condition()でパースする。あとでこの処理も説明する。
analysis_file()は複数ファイルを単独で処理するために、このログを検知してくれと、analysis_files()から呼ばれる。alalysis_file()は、and/or/not処理用です。add_aggcondition_msges()がcount処理用です。つまり、これらにフォーカスして見ていけばよい。
ここからは、次の章で個別に見ていく。
条件式パース
結構、わかりやすい日本語のコメントを読むと、理解できる。
pub enum ConditionToken { LeftParenthesis, RightParenthesis, Space, Not, And, Or, SelectionReference(String), // パースの時に上手く処理するために作った疑似的なトークン ParenthesisContainer(Vec<ConditionToken>), // 括弧を表すトークン AndContainer(Vec<ConditionToken>), // ANDでつながった条件をまとめるためのトークン OrContainer(Vec<ConditionToken>), // ORでつながった条件をまとめるためのトークン NotContainer(Vec<ConditionToken>), // 「NOT」と「NOTで否定される式」をまとめるためのトークン この配列には要素が一つしか入らないが、他のContainerと同じように扱えるようにするためにVecにしている。あんまり良くない。 OperandContainer(Vec<ConditionToken>), // ANDやORやNOT等の演算子に対して、被演算子を表す }
トークンがenumで定義されています。ConditonTokenの中にConditonTokenの配列があり、入れ子になってます。 Rustにはパターンマッチがあるので、便利。
fn to_enum(&self, token: String) -> ConditionToken { if token == "(" { return ConditionToken::LeftParenthesis; } else if token == ")" { return ConditionToken::RightParenthesis; } else if token == " " { return ConditionToken::Space; } else if token == "not" { return ConditionToken::Not; } else if token == "and" { return ConditionToken::And; } else if token == "or" { return ConditionToken::Or; } else { return ConditionToken::SelectionReference(token.clone()); } }
pub static ref CONDITION_REGEXMAP: Vec<Regex> = vec![ Regex::new(r"^\(").unwrap(), Regex::new(r"^\)").unwrap(), Regex::new(r"^ ").unwrap(), Regex::new(r"^\w+").unwrap(), ];
正規表現を配列で定義している。括弧と空白と文字。
Regex::new()はコストのかかる処理で、都度呼び出すと処理速度に大きな影響を与えます。呼び出し回数を削減するために、lazy_staticを使ってRegexを使いまわすようにしています。
fn compile_condition_body(
&self,
condition_str: String,
name_2_node: &HashMap<String, Arc<Box<dyn SelectionNode>>>,
) -> Result<Box<dyn SelectionNode>, String> {
let tokens = self.tokenize(&condition_str)?;
let parsed = self.parse(tokens)?;
return self.to_selectnode(parsed, name_2_node);
}
トークナイズをやって、パースする。
ConditionTokenをSelectNodeに変更するのがto_selectnode()。これが大事。
fn tokenize(&self, condition_str: &String) -> Result<Vec<ConditionToken>, String> { let mut cur_condition_str = condition_str.clone(); let mut tokens = Vec::new(); while cur_condition_str.len() != 0 { let captured = self::CONDITION_REGEXMAP.iter().find_map(|regex| { return regex.captures(cur_condition_str.as_str()); }); if captured.is_none() { // トークンにマッチしないのはありえないという方針でパースしています。 return Result::Err("An unusable character was found.".to_string()); } let mached_str = captured.unwrap().get(0).unwrap().as_str(); let token = self.to_enum(mached_str.to_string()); if let ConditionToken::Space = token { // 空白は特に意味ないので、読み飛ばす。 cur_condition_str = cur_condition_str.replacen(mached_str, "", 1); continue; } tokens.push(token); cur_condition_str = cur_condition_str.replacen(mached_str, "", 1); } return Result::Ok(tokens); }
これは、トークナイザー。トークンに分ける処理ですね。
let token = self.to_enum(mached_str.to_string());
で文字列をトークンに変換して、tokens.push(token);
でConditionToken型の配列に入れていますね。
トークンに一致しないと、エラーを出していて、
空白だったら、continueしてますね。
self::CONDITION_REGEXMAP.iter().find_map(|regex| {
return regex.captures(cur_condition_str.as_str());
});
これは、正規表現の配列を前から実行していて、先頭で一致した文字列を返している。
fn parse(&self, tokens: Vec<ConditionToken>) -> Result<ConditionToken, String> { // 括弧で囲まれた部分を解析します。 // (括弧で囲まれた部分は後で解析するため、ここでは一時的にConditionToken::ParenthesisContainerに変換しておく) // 括弧の中身を解析するのはparse_rest_parenthesis()で行う。 let tokens = self.parse_parenthesis(tokens)?; // AndとOrをパースする。 let tokens = self.parse_and_or_operator(tokens)?; // OperandContainerトークンの中身をパースする。(現状、Notを解析するためだけにある。将来的に修飾するキーワードが増えたらここを変える。) let token = self.parse_operand_container(tokens)?; // 括弧で囲まれている部分を探して、もしあればその部分を再帰的に構文解析します。 return self.parse_rest_parenthesis(token); }
構文解析をする肝心のパーサーです。parse_rest_parenthesis()で最後に再帰的に構文解析しているらしい。 「ConditionToken::ParenthesisContainerに変換しておく」のがみそだと思う。
fn parse_and_or_operator(&self, tokens: Vec<ConditionToken>) -> Result<ConditionToken, String> { for (i, token) in tokens.into_iter().enumerate() { . . . if (i % 2 == 1) != self.is_logical(&token) { // インデックスが奇数の時はLogicalOperatorで、インデックスが偶数のときはOperandContainerになる return Result::Err( "The use of a logical operator(and, or) was wrong.".to_string(), ); } if i % 2 == 0 { // ここで再帰的にAND,ORをパースする関数を呼び出す operand_list.push(token); } else { operator_list.push(token); } } let mut operant_ite = operand_list.into_iter(); let mut operands = vec![operant_ite.next().unwrap()]; for token in operator_list.iter() { if let ConditionToken::Or = token { // Orの場合はそのままリストに追加 operands.push(operant_ite.next().unwrap()); } else { // Andの場合はANDでつなげる let and_operands = vec![operands.pop().unwrap(), operant_ite.next().unwrap()]; let and_container = ConditionToken::AndContainer(and_operands); operands.push(and_container); } } . . . }
and orをパースする。長いので割愛してるが重要なところだけを説明する。
i % 2 == 0
のところが重要。どれがオペランドかオペレータかを判別している。iは配列のインデックスであり、括弧があっても、その数式で判別できる。
たとえば、
A and C and B
という入力があると、
AndContainer([AndContainer([ SelectionReference("A"), SelectionReference("C")]), SelectionReference("B")])
を生成する。
これは、木になっており、あとで解説するiter().all()によって、上から再帰的に実行される。
if let ConditionToken::AndContainer(sub_tokens) = token { let mut select_and_node = AndSelectionNode::new(); for sub_token in sub_tokens.into_iter() { let sub_node = self.to_selectnode(sub_token, name_2_node)?; select_and_node.child_nodes.push(sub_node); } return Result::Ok(Box::new(select_and_node)); }
これは、to_selectnode()の一部。ConditionTokenからSelectionNodeへ変更してる。 こんな感じに、tokenの中のsub_tokensを再帰している。
こんな感じに、SelectionNode木が作成される。
countパース
rule/ディレクトリに入ってるコードは、日本語コメントがあって優しい。
pub static ref AGGREGATION_REGEXMAP: Vec<Regex> = vec![ Regex::new(r"^count\( *\w* *\)").unwrap(), // countの式 Regex::new(r"^ ").unwrap(), Regex::new(r"^by").unwrap(), Regex::new(r"^==").unwrap(), Regex::new(r"^<=").unwrap(), Regex::new(r"^>=").unwrap(), Regex::new(r"^<").unwrap(), Regex::new(r"^>").unwrap(), Regex::new(r"^\w+").unwrap(), ];
さっきと同様に、正規表現を配列で用意している。
pub enum AggregationConditionToken { COUNT(String), // count SPACE, // 空白 BY, // by EQ, // ..と等しい LE, // ..以下 LT, // ..未満 GE, // ..以上 GT, // .よりおおきい KEYWORD(String), // BYのフィールド名 }
pub fn compile(&self, condition_str: String) -> Result<Option<AggregationParseInfo>, String> { let result = self.compile_body(condition_str); . . . }
コンパイルエントリ。compile_body()でメイン処理をする。
pub fn compile_body( &self, condition_str: String, ) -> Result<Option<AggregationParseInfo>, String> { // パイプの部分だけを取り出す let captured = self::RE_PIPE.captures(&condition_str); if captured.is_none() { // パイプが無いので終了 return Result::Ok(Option::None); } // ハイプ自体は削除してからパースする。 let aggregation_str = captured .unwrap() .get(0) .unwrap() .as_str() .to_string() .replacen("|", "", 1); let tokens = self.tokenize(aggregation_str)?; return self.parse(tokens); }
パイプが無いとエラーが起きる。ソースコードから、conditionの書き方が分かったりもする。 パイプを削除してから、トークナイザーを実行。その処理は、and/or/notとほぼ同じなので、割愛。 パーサーを見てみよう。
と思ったけど、ソースコードのコメント見てね。
https://github.com/Yamato-Security/hayabusa/blob/main/src/detections/rule/aggregation_parser.rs#L134
and/or/not検知(eval)
関数フローのdetection.start()で並列に解析を実行していく。
Rustの優秀な関数を使うので、便利。
// Ruleファイルの detection- selection配下のノードはこのtraitを実装する。 pub trait SelectionNode: mopa::Any { // 引数で指定されるイベントログのレコードが、条件に一致するかどうかを判定する // このトレイトを実装する構造体毎に適切な判定処理を書く必要がある。 fn select(&self, event_record: &EvtxRecordInfo) -> bool; // 初期化処理を行う // 戻り値としてエラーを返却できるようになっているので、Ruleファイルが間違っていて、SelectionNodeを構成出来ない時はここでエラーを出す // AndSelectionNode等ではinit()関数とは別にnew()関数を実装しているが、new()関数はただインスタンスを作るだけにして、あまり長い処理を書かないようにしている。 // これはRuleファイルのパースのエラー処理をinit()関数にまとめるためにこうしている。 fn init(&mut self) -> Result<(), Vec<String>>; // 子ノードを取得する(グラフ理論のchildと同じ意味) fn get_childs(&self) -> Vec<&Box<dyn SelectionNode>>; // 子孫ノードを取得する(グラフ理論のdescendantと同じ意味) fn get_descendants(&self) -> Vec<&Box<dyn SelectionNode>>; } mopafy!(SelectionNode);
このトレイトと満たすものは、中のメソッドは呼べる。
pub fn select(&mut self, event_record: &EvtxRecordInfo) -> bool { let result = self.detection.select(event_record); if result && self.has_agg_condition() { count::count(self, &event_record.record); } return result; }
これは、impl RuleNodeのselect()。
self.detectionは、SelectionNode型なので、これを満たす構造体は、select()を呼べる。
AndSelectionNode, NotSelectionNode, OrSelectionNode, RefSelectionNode, LeafSelectionNodeがSelectionNodeトレイトを満たす。
それぞれ、同じようなselect()になっていて、andのselect()実装を見てみる。
fn select(&self, event_record: &EvtxRecordInfo) -> bool { return self.child_nodes.iter().all(|child_node| { return child_node.select(event_record); }); }
iter().all()が非常に優秀。入れ子になっているSelectionNodeがすべてtrueだったら、trueを返す。
orだったら、iter().any()を呼ぶ。どれかひとつでもtrueだったら、true。 この仕組みで、論理式を判定している。
再帰で入れ子を実行実行していくと、最後には、 LeafSelectionNode構造体を実行することになり、これのselect()が以下のようになっている。
let values = utils::get_event_value(&"Event.EventData.Data".to_string(), &event_record.record); let eventdata_data = values.unwrap(); if eventdata_data.is_boolean() || eventdata_data.is_i64() || eventdata_data.is_string() { return self .matcher .as_ref() .unwrap() .is_match(event_record.get_value(self.get_key()), event_record); }
レコード(evtxの中の1つ分)からEvent.EventData.Data項目を取ってきて、boolかi64かstringだったら、is_matchを読んで確認する。(これは一部で、他の処理もしている)
ここからはmatcherに入るので、少々複雑。私があまり興味ないので、雑です。 self.matcherの中身は、RuleNode.detectionが初期化されるときに、作られる。
LeafSelectionNode構造体のselect()の最初の
if self.matcher.is_none() { return false; }
は、conditionの中の変数の中身が定義されていないなら、ルールにマッチしないことを返す。
self.matcherにandとorで使う変数の名前と中身が保存されている。
is_match()で変数の中身とレコードの中身が正規表現で一致すれば、trueを返す。
count処理
timeframeについて書く。
pub fn aggregation_condition_select(rule: &RuleNode) -> Vec<AggResult> { // recordでaliasが登録されている前提とする let value_map = &rule.countdata; let mut ret = Vec::new(); for (key, value) in value_map { ret.append(&mut judge_timeframe(&rule, &value, &key.to_string())); } return ret; }
これがcount処理のエントリーポイント。カウントしたい文字列ごとにjudge_timeframe()を呼んでますね。 並列化をしてないようです。今後そうすると、早くなるかもしれませんね。
judge_timeframe()を見ていきましょう。
pub fn judge_timeframe( rule: &RuleNode, time_datas: &Vec<AggRecordTimeInfo>, key: &String, ) -> Vec<AggResult> { let mut ret: Vec<AggResult> = Vec::new(); if time_datas.is_empty() { return ret; } // AggRecordTimeInfoを時間順がソートされている前提で処理を進める let mut datas = time_datas.clone(); datas.sort_by(|a, b| a.record_time.cmp(&b.record_time)); // timeframeの設定がルールにない時は最初と最後の要素の時間差をtimeframeに設定する。 let def_frame = &datas.last().unwrap().record_time.timestamp() - &datas.first().unwrap().record_time.timestamp(); let frame = get_sec_timeframe(rule).unwrap_or(def_frame); // left <= i < rightの範囲にあるdata[i]がtimeframe内にあるデータであると考える let mut left: i64 = 0; let mut right: i64 = 0; let mut counter = _create_counter(rule); let data_len = datas.len() as i64; // rightは開区間なので+1 while left < data_len && right < data_len + 1 { // timeframeの範囲にある限りrightをincrement while right < data_len && _is_in_timeframe(left, right, frame, &datas) { counter.add_data(right, &datas, rule); right = right + 1; } let cnt = counter.count(); if select_aggcon(cnt as i64, rule) { // 条件を満たすtimeframeが見つかった ret.push(counter.create_agg_result(left, &datas, cnt, key, rule)); left = right; } else { // 条件を満たさなかったので、rightとleftを+1ずらす counter.add_data(right, &datas, rule); right += 1; counter.remove_data(left, &datas, rule); left += 1; } } return ret; }
ルールで「24時間内にログインが5回以上あったら攻撃と見なす」などと書ける。timeframeは、この24時間のこと。
区間問題系のアルゴリズムを使っている。 尺取り法というアルゴリズムを参考にしています。それにより、計算量を愚直に計算したO(N*N)より早くしています。(参考資料)
recordの中にtargetの文字列が存在するかどうかの判定は、RuleNode構造体を初期化するときにselect()で処理しています。それを終えているとして、話を進めます。
まず、時間順にソートする。
leftが区間の左端を指していて、rightが区間の右端を指している。
leftもrightも、0で初期化されている。0でframe内であれば、countとrightをインクリメントする。
条件を満たさなくなると、ルールを満たしているか判断するselect_aggcon()を実行する。by、<、>、などを満たしていると、必要な情報を ret.push
して、leftにrightを代入して、またそこから開始する。満たしてなければ、leftとrightをインクリメントして、timeframe区間ごと、右に移行してする。
区間がtimeframe内かどうかを判定するのは、以下の _is_in_timeframe()で処理をしている。
fn _is_in_timeframe(left: i64, right: i64, frame: i64, datas: &Vec<AggRecordTimeInfo>) -> bool { let left_time = _get_timestamp(left, datas); let left_time_nano = _get_timestamp_subsec_nano(left, datas); // evtxのSystemTimeは小数点7桁秒まで記録されているので、それを考慮する let mut right_time = _get_timestamp(right, datas); let right_time_nano = _get_timestamp_subsec_nano(right, datas); if right_time_nano > left_time_nano { right_time += 1; } return right_time - left_time <= frame; }
小数点の細かいところまで考慮しているそうです。
right_time - left_time <= frame
こんな感じで判定しているね。
ちょっと長いので、select_aggcon()はURLを張り付けておきますね。
以上です。最後まで読んでくださって、ありがとうございます。
最後に
OSS Hayabusaは大和セキュリティーにより、開発されました。開発者やAD hacking(ルール作成)などを募集しています。 興味ある方は、以下のURLでslackに参加して、#hayabusa-*チャンネルで自己紹介から始めてください。ちょっと、話だけでも聞きたい!でも大丈夫です。お待ちしております。
また、
Hayabusaのバグや追加してしい機能などの報告は、GitHub issueで受け付けております。なお、githubがよくわからない方は、slackで気軽に意見を募集しております。
この記事をきっかけにHayabusaに興味を持ってくれると、幸いです。
それでは!よい、セキュリティーライフを!!