Ocean Lotus Group,也被称之为APT32,这个黑客组织此前主要的攻击目标以越南、老挝和菲律宾等东亚国家为主,虽然私营企业是该组织的主要目标,但外国政府、政治活动家和新闻记者也是他们的攻击目标之一。
APT32的攻击工具非常多样化,从Mimikatz和Cobalt Strike这样的高级定制工具,到ShellCode以及后门等等,应有尽有。而且他们所使用的很多代码都经过了高度模糊处理或混淆处理,并使用了不同的技术来提升检测和分析的难度,导致研究人员更加难以对它们进行逆向分析。
在这篇文章中,我们将介绍该组织所使用的其中一种代码混淆技术,而这种技术也被APT32广泛应用到了他们的后门代码中。反混淆处理的过程中需要使用到Cutter以及官方开源逆向工程框架-radare2,还请各位同学自行搜索下载。
Cutter目前支持Linux、macOS和Windows。
Cutter下载地址:【点击底部阅读原文获取】
Cutter基础教程:【点击底部阅读原文获取】
我们的样本(486be6b1ec73d98fdd3999abe2fa04368933a2ec)是多级感染链中的一部分,而且APT32在多个活动中都使用到了这个后门,例如恶意文件样本(115f3cb5bdfb2ffe5168ecb36b9aed54)。这个文档声称自己来自于360,但是其中包含了一个恶意VBA宏,这个恶意宏会向rundll32.exe注入恶意Shellcode。Shellcode中包含了解密代码,可以直接对恶意代码进行解密并将相应的DLL加载进内存,而DLL包含的就是后门逻辑。
首先,后门会解密一个配置文件,其中存储的信息包含C2服务器基础信息在内。接下来,代码会尝试使用自定义PE加载器向内存中加载恶意DLL。这个DLL会被HTTPProv.dll调用,并能够与C2服务器通信。后门还可以从C2服务器接收十几种不同的指令,包括Shellcode执行、新进程创建以及文件和目录修改等操作。
该组织所使用的很多混淆技术其目的就是要增加逆向分析的难度,而且其二进制代码中使用了大量的垃圾代码,这些垃圾代码会增加样本的体积和复杂性,以分散研究人员的注意力。而且,其中的代码集经常会与堆栈指针一起使用,而普通的反编译工具无法对这种情况进行有效处理。
APT32在进行代码混淆处理时,大量使用了控制流混淆,并且向函数流中注入了大量垃圾代码块。这些垃圾代码块不会实现任何功能,只是为了混淆视听而已。
大家可以从上图中看到,其中包含了大量垃圾代码块。仔细分析后我们会发现,所有需要跳转到这些代码段的条件判断结果都为False,而且都是以条件跳转结束的,跟之前的条件判断正好相反。比如说,垃圾代码段之前的条件判断为jo
这样一来,我们就可以对这些垃圾代码段定性了。第一种特性:出现两个连续的垃圾代码块,以相反的条件跳转到相同的目标地址并结束。第二种特性:要求第二个块不包含有意义的指令,如字符串引用或代码调用等等。
当满足这两个特征时,我们可以说第二个块很可能是垃圾代码块。这样,我们就可以将垃圾块从图表中删除了,并使用无条件跳转来修补源代码。
首先,我们要创建一个Python类作为我们的核心类,这个类需要包含查找和移除垃圾代码块的逻辑。先定义init函数,该函数可以接收管道消息,可以是来自redare2的r2pipe对象(importr2pipe),也可以是来自Cutter的cutter对象(import cutter)。
class GraphDeobfuscator: def __init__(self, pipe): """an initializationfunction for the class Arguments: pipe {r2pipe} -- an instance ofr2pipe or Cutter's wrapper """ self.pipe = pipe
现在我们就可以使用这个管道来执行radare2命令了。这个管道对象包含两种执行r2命令的方式。第一种为pipe.cmd(),它能够以字符串的形式返回命令执行结果。第二种为pipe.cmdj(j),它你能够根据radare2命令的输出结果返回解析后的JSON对象。
接下来就是从当前函数中获取所有的代码块,然后进行迭代。这里可以使用afbj米工龄来获取函数中所有代码块的JSON对象。
def clean_junk_blocks(self): """Search a givenfunction for junk blocks, remove them and fix the flow. """ # Get all the basic blocks of thefunction blocks = self.pipe.cmdj("afbj @$F") if not blocks: print("[X] No blocks found. Is it afunction?") return modified = False # Iterate over all the basic blocks ofthe function for block in blocks: # do something
针对每一个块,根据之前的判断条件进行分析,获取候选垃圾代码块:
def get_fail_block(self, block): """Return the block towhich a block branches if the condition is fails Arguments: block {block_context} -- A JSONrepresentation of a block Returns: block_context -- The block to whichthe branch fails. If not exists, returns None """ # Get the address of the"fail" branch fail_addr = self.get_fail(block) if not fail_addr: return None # Get a block context of the failaddress fail_block = self.get_block(fail_addr) return fail_block if fail_block elseNone def is_successive_fail(self, block_A,block_B): """Check if the endaddress of block_A is the start of block_B Arguments: block_A {block_context} -- A JSONobject to represent the first block block_B {block_context} -- A JSONobject to represent the second block Returns: bool -- True if block_B comes immediatelyafter block_A, False otherwise """ return ((block_A["addr"] +block_A["size"]) == block_B["addr"])
接下来,我们要判断候选垃圾代码段是否包含无效指令:
def contains_meaningful_instructions (self,block): '''Check if a block contains meaningfulinstructions (references, calls, strings,...) Arguments: block {block_context} -- A JSONobject which represents a block Returns: bool -- True if the block containsmeaningful instructions, False otherwise ''' # Get summary of block - strings, calls,references summary = self.pipe.cmd("pdsb @{addr}".format(addr=block["addr"])) return summary != ""
最后,枚举出所有对立的跳转条件:
jmp_pairs = [ ['jno', 'jo'], ['jnp', 'jp'], ['jb', 'jnb'], ['jl', 'jnl'], ['je', 'jne'], ['jns', 'js'], ['jnz', 'jz'], ['jc', 'jnc'], ['ja', 'jbe'], ['jae', 'jb'], ['je', 'jnz'], ['jg', 'jle'], ['jge', 'jl'], ['jpe', 'jpo'], ['jne', 'jz']] def is_opposite_conditional(self, cond_A,cond_B): """Check if two operandsare opposite conditional jump operands Arguments: cond_A {string} -- the conditionaljump operand of the first block cond_B {string} -- the conditionaljump operand of the second block Returns: bool -- True if the operands areopposite, False otherwise """ sorted_pair = sorted([cond_A, cond_B]) for pair in self.jmp_pairs: if sorted_pair == pair: return True return False
将上述所有代码整合到cleanjunkblocks()函数中:
def clean_junk_blocks(self): """Search a givenfunction for junk blocks, remove them and fix the flow. """ # Get all the basic blocks of thefunction blocks = self.pipe.cmdj("afbj @$F") if not blocks: print("[X] No blocks found. Isit a function?") return modified = False # Iterate over all the basic blocks ofthe function for block in blocks: fail_block =self.get_fail_block(block) if not fail_block or \ not self.is_successive_fail(block,fail_block) or \ self.contains_meaningful_instructions(fail_block) or \ notself.is_opposite_conditional(self.get_last_mnem_of_block(block),self.get_last_mnem_of_block(fail_block)): continue
if__name__ == "__main__": graph_deobfuscator = GraphDeobfuscator(pipe) graph_deobfuscator.clean_graph()
ifcutter_available: # This part will be executed only if Cutteris available. # This will create the cutter plugin and UIobjects for the plugin classGraphDeobfuscatorCutter(cutter.CutterPlugin): name = "APT32 GraphDeobfuscator" description = "Graph Deobfuscatorfor APT32 Samples" version = "1.0" author = "Itay Cohen(@Megabeets_)" def setupPlugin(self): pass def setupInterface(self, main): pass def create_cutter_plugin(): return GraphDeobfuscatorCutter()
为了保证插件正常运行,我们还需要增加一个菜单入口来触发反混淆功能:
ifcutter_available: # This part will be executed only if Cutteris available. This will # create the cutter plugin and UI objectsfor the plugin classGraphDeobfuscatorCutter(cutter.CutterPlugin): name = "APT32 GraphDeobfuscator" description = "Graph Deobfuscatorfor APT32 Samples" version = "1.0" author = "Megabeets" def setupPlugin(self): pass def setupInterface(self, main): # Create a new action (menu item) action = QAction("APT32 GraphDeobfuscator", main) action.setCheckable(False) # Connect the action to a function - cleaner. # A click on this action willtrigger the function action.triggered.connect(self.cleaner) # Add the action to the"Windows -> Plugins" menu pluginsMenu =main.getMenuByType(main.MenuType.Plugins) pluginsMenu.addAction(action) def cleaner(self): graph_deobfuscator =GraphDeobfuscator(pipe) graph_deobfuscator.clean_graph() cutter.refresh() def create_cutter_plugin(): return GraphDeobfuscatorCutter()
接下来,我们就可以看到图形化的分析结果了:
移除垃圾代码段之后的结果图如下所示:
对比图如下:
Be6d5973452248cb18949711645990b6a56e7442dc30cc48a607a2afe7d8ec668d74d544396b57e6faa4f8fdf96a1a5e30b196d56c15f7cf05767a406708a6b2
"""A plugin for Cutter and Radare2 to deobfuscate APT32 flow graphsThisis a python plugin for Cutter that is compatible as an r2pipe script forradare2as well. The plugin will help reverse engineers to deobfuscate and removejunkblocks from APT32 (Ocean Lotus) samples."""__author__ = "Itay Cohen, aka @megabeets_"__company__= "Check Point Software Technologies Ltd"#Check if we're running from cuttertry: import cutter from PySide2.QtWidgets import QAction pipe = cutter cutter_available = True# Ifno, assume running from radare2except: import r2pipe pipe = r2pipe.open() cutter_available = FalseclassGraphDeobfuscator: # A list of pairs of opposite conditionaljumps jmp_pairs = [ ['jno', 'jo'], ['jnp', 'jp'], ['jb', 'jnb'], ['jl', 'jnl'], ['je', 'jne'], ['jns', 'js'], ['jnz', 'jz'], ['jc', 'jnc'], ['ja', 'jbe'], ['jae', 'jb'], ['je', 'jnz'], ['jg', 'jle'], ['jge', 'jl'], ['jpe', 'jpo'], ['jne', 'jz']] def __init__(self, pipe, verbose=False): """an initializationfunction for the class Arguments: pipe {r2pipe} -- an instance ofr2pipe or Cutter's wrapper Keyword Arguments: verbose {bool} -- if True willprint logs to the screen (default: {False}) """ self.pipe = pipe self.verbose = verbose def is_successive_fail(self, block_A,block_B): """Check if the endaddress of block_A is the start of block_B Arguments: block_A {block_context} -- A JSONobject to represent the first block block_B {block_context} -- A JSONobject to represent the second block Returns: bool -- True if block_B comesimmediately after block_A, False otherwise """ return ((block_A["addr"] +block_A["size"]) == block_B["addr"]) def is_opposite_conditional(self, cond_A,cond_B): """Check if two operandsare opposite conditional jump operands Arguments: cond_A {string} -- the conditionaljump operand of the first block cond_B {string} -- the conditionaljump operand of the second block Returns: bool -- True if the operands areopposite, False otherwise """ sorted_pair = sorted([cond_A, cond_B]) for pair in self.jmp_pairs: if sorted_pair == pair: return True return False defcontains_meaningful_instructions (self, block): '''Check if a block contains meaningfulinstructions (references, calls, strings,...) Arguments: block {block_context} -- A JSONobject which represents a block Returns: bool -- True if the block containsmeaningful instructions, False otherwise ''' # Get summary of block - strings,calls, references summary = self.pipe.cmd("pdsb @{addr}".format(addr=block["addr"])) return summary != "" def get_block_end(self, block): """Get the address ofthe last instruction in a given block Arguments: block {block_context} -- A JSONobject which represents a block Returns: The address of the last instructionin the block """ # save current seek self.pipe.cmd("s{addr}".format(addr=block['addr'])) # This will return the address of ablock's last instruction block_end = self.pipe.cmd("?v $ :-1") return block_end def get_last_mnem_of_block(self, block): """Get the mnemonic ofthe last instruction in a block Arguments: block {block_context} -- A JSONobject which represents a block Returns: string -- the mnemonic of the lastinstruction in the given block """ inst_info = self.pipe.cmdj("aoj @{addr}".format(addr=self.get_block_end(block)))[0] return inst_info["mnemonic"] def get_jump(self, block): """Get the address towhich a block jumps Arguments: block {block_context} -- A JSONobject which represents a block Returns: addr -- the address to which theblock jumps to. If such address doesn't exist, returns False """ return block["jump"] if"jump" in block else None def get_fail_addr(self, block): """Get the address towhich a block fails Arguments: block {block_context} -- A JSONobject which represents a block Returns: addr -- the address to which theblock fail-branches to. If such address doesn't exist, returns False """ return block["fail"] if"fail" in block else None def get_block(self, addr): """Get the block contextin a given address Arguments: addr {addr} -- An address in ablock Returns: block_context -- the block to whichthe address belongs """ block = self.pipe.cmdj("abj. @{offset}".format(offset=addr)) return block[0] if block else None def get_fail_block(self, block): """Return the block towhich a block branches if the condition is fails Arguments: block {block_context} -- A JSONrepresentation of a block Returns: block_context -- The block to whichthe branch fails. If not exists, returns None """ # Get the address of the"fail" branch fail_addr = self.get_fail_addr(block) if not fail_addr: return None # Get a block context of the failaddress fail_block = self.get_block(fail_addr) return fail_block if fail_block elseNone def reanalize_function(self): """Re-Analyze a functionat a given address Arguments: addr {addr} -- an address of afunction to be re-analyze """ # Seek to the function's start self.pipe.cmd("s $F") # Undefine the function in this address self.pipe.cmd("af- $") # Define and analyze a function in thisaddress self.pipe.cmd("afr @ $") def overwrite_instruction(self, addr): """Overwrite aconditional jump to an address, with a JMP to it Arguments: addr {addr} -- address of aninstruction to be overwritten """ jump_destination =self.get_jump(self.pipe.cmdj("aoj @ {addr}".format(addr=addr))[0]) if (jump_destination): self.pipe.cmd("wai jmp0x{dest:x} @ {addr}".format(dest=jump_destination, addr=addr)) def get_current_function(self): """Return the startaddress of the current function Return Value: The address of the currentfunction. None if no function found. """ function_start =int(self.pipe.cmd("?vi $FB")) return function_start if function_start!= 0 else None def clean_junk_blocks(self): """Search a givenfunction for junk blocks, remove them and fix the flow. """ # Get all the basic blocks of thefunction blocks = self.pipe.cmdj("afbj ) "if not blocks: print("[X] No blocks found. Isit a function?") return # Have we modified any instruction inthe function? # If so, a reanalyze of the function isrequired modified = False # Iterate over all the basic blocks ofthe function for block in blocks: fail_block =self.get_fail_block(block) # Make validation checks if not fail_block or \ not self.is_successive_fail(block,fail_block) or \ self.contains_meaningful_instructions(fail_block) or \ notself.is_opposite_conditional(self.get_last_mnem_of_block(block),self.get_last_mnem_of_block(fail_block)): continue if self.verbose: print ("Potential junk:0x{junk_block:x}(0x{fix_block:x})".format(junk_block=fail_block["addr"],fix_block=block["addr"])) self.overwrite_instruction(self.get_block_end(block)) modified = True if modified: self.reanalize_function() def clean_graph(self): """the initial functionof the class. Responsible to enable cache and start the cleaning """ # Enable cache writing mode. changeswill only take place in the session and # will not override the binary self.pipe.cmd("eio.cache=true") self.clean_junk_blocks() ifcutter_available: # This part will be executed only if Cutteris available. This will # create the cutter plugin and UI objectsfor the plugin classGraphDeobfuscatorCutter(cutter.CutterPlugin): name = "APT32 GraphDeobfuscator" description = "Graph Deobfuscatorfor APT32 Samples" version = "1.0" author = "Itay Cohen()" def setupPlugin(self): pass def setupInterface(self, main): # Create a new action (menu item) action = QAction("APT32 GraphDeobfuscator", main) action.setCheckable(False) # Connect the action to a function- cleaner. # A click on this action willtrigger the function action.triggered.connect(self.cleaner) # Add the action to the"Windows -> Plugins" menu pluginsMenu =main.getMenuByType(main.MenuType.Plugins) pluginsMenu.addAction(action) def cleaner(self): graph_deobfuscator =GraphDeobfuscator(pipe) graph_deobfuscator.clean_graph() cutter.refresh() def create_cutter_plugin(): return GraphDeobfuscatorCutter()if__name__ == "__main__": graph_deobfuscator =GraphDeobfuscator(pipe)graph_deobfuscator.clean_graph()
*本文作者:checkpoint,转载请注明来自FreeBuf.COM
精彩推荐