前面写了flink的文章,其实流处理不止有flink、storm、spark streaming,说实话这些其实都是比较传统的流处理框架。今天介绍一个大家不一定用得很多,但是却很有特点的东西,NiFi
Apache NiFi项目,它是一种实时数据流处理 系统,在去年由美国安全局(NSA)开源并进入Apache社区,NiFi初始的项目名称是Niagarafiles。当NiFi项目开源之后,一些早先在NSA的开发者们创立了初创公司Onyara,Onyara随之继续NiFi项目的开发并提供相关的支持。Hortonworks公司收购了Onyara并将其开发者整合到自己的团队中,形成HDF(Hortonworks Data Flow)平台。
下面是官方的一些关键能力介绍,可以认真看看:
Apache NiFi supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. Some of the high-level capabilities and objectives of Apache NiFi include:
Web-based user interface
Seamless experience between design, control, feedback, and monitoring
Highly configurable
Loss tolerant vs guaranteed delivery
Low latency vs high throughput
Dynamic prioritization
Flow can be modified at runtime
Back pressure
Data Provenance
Track dataflow from beginning to end
Designed for extension
Build your own processors and more
Enables rapid development and effective testing
Secure
SSL, SSH, HTTPS, encrypted content, etc...
Multi-tenant authorization and internal authorization/policy management
总结来说,做为一个流处理引擎,NiFi的核心差异化能力主要有两点:
丰富的算子
整合了大量数据源的处理能力,详细的可以登录nifi官网(https://nifi.apache.org/docs.html)详细看各个算子的能力,下面列一列算子,让大家有个感觉,,还是相当丰富的。
Processors
AttributeRollingWindow 1.3.0
AttributesToJSON 1.3.0
Base64EncodeContent 1.3.0
CaptureChangeMySQL 1.3.0
CompareFuzzyHash 1.3.0
CompressContent 1.3.0
ConnectWebSocket 1.3.0
ConsumeAMQP 1.3.0
ConsumeEWS 1.3.0
ConsumeIMAP 1.3.0
ConsumeJMS 1.3.0
ConsumeKafka 1.3.0
ConsumeKafka_0_10 1.3.0
ConsumeKafkaRecord_0_10 1.3.0
ConsumeMQTT 1.3.0
ConsumePOP3 1.3.0
ConsumeWindowsEventLog 1.3.0
ControlRate 1.3.0
ConvertAvroSchema 1.3.0
ConvertAvroToJSON 1.3.0
ConvertAvroToORC 1.3.0
ConvertCharacterSet 1.3.0
ConvertCSVToAvro 1.3.0
ConvertExcelToCSVProcessor 1.3.0
ConvertJSONToAvro 1.3.0
ConvertJSONToSQL 1.3.0
ConvertRecord 1.3.0
CreateHadoopSequenceFile 1.3.0
DebugFlow 1.3.0
DeleteDynamoDB 1.3.0
DeleteGCSObject 1.3.0
DeleteHDFS 1.3.0
DeleteS3Object 1.3.0
DeleteSQS 1.3.0
DetectDuplicate 1.3.0
DistributeLoad 1.3.0
DuplicateFlowFile 1.3.0
EncryptContent 1.3.0
EnforceOrder 1.3.0
EvaluateJsonPath 1.3.0
EvaluateXPath 1.3.0
EvaluateXQuery 1.3.0
ExecuteFlumeSink 1.3.0
ExecuteFlumeSource 1.3.0
ExecuteProcess 1.3.0
ExecuteScript 1.3.0
ExecuteSQL 1.3.0
ExecuteStreamCommand 1.3.0
ExtractAvroMetadata 1.3.0
ExtractCCDAAttributes 1.3.0
ExtractEmailAttachments 1.3.0
ExtractEmailHeaders 1.3.0
ExtractGrok 1.3.0
ExtractHL7Attributes 1.3.0
ExtractImageMetadata 1.3.0
ExtractMediaMetadata 1.3.0
ExtractText 1.3.0
ExtractTNEFAttachments 1.3.0
FetchAzureBlobStorage 1.3.0
FetchDistributedMapCache 1.3.0
FetchElasticsearch 1.3.0
FetchElasticsearch5 1.3.0
FetchElasticsearchHttp 1.3.0
FetchFile 1.3.0
FetchFTP 1.3.0
FetchGCSObject 1.3.0
FetchHBaseRow 1.3.0
FetchHDFS 1.3.0
FetchParquet 1.3.0
FetchS3Object 1.3.0
FetchSFTP 1.3.0
FuzzyHashContent 1.3.0
GenerateFlowFile 1.3.0
GenerateTableFetch 1.3.0
GeoEnrichIP 1.3.0
GetAzureEventHub 1.3.0
GetCouchbaseKey 1.3.0
GetDynamoDB 1.3.0
GetFile 1.3.0
GetFTP 1.3.0
GetHBase 1.3.0
GetHDFS 1.3.0
GetHDFSEvents 1.3.0
GetHDFSSequenceFile 1.3.0
GetHTMLElement 1.3.0
GetHTTP 1.3.0
GetIgniteCache 1.3.0
GetJMSQueue 1.3.0
GetJMSTopic 1.3.0
GetKafka 1.3.0
GetMongo 1.3.0
GetSFTP 1.3.0
GetSNMP 1.3.0
GetSolr 1.3.0
GetSplunk 1.3.0
GetSQS 1.3.0
GetTCP 1.3.0
GetTwitter 1.3.0
HandleHttpRequest 1.3.0
HandleHttpResponse 1.3.0
HashAttribute 1.3.0
HashContent 1.3.0
IdentifyMimeType 1.3.0
InferAvroSchema 1.3.0
InvokeHTTP 1.3.0
InvokeScriptedProcessor 1.3.0
ISPEnrichIP 1.3.0
JoltTransformJSON 1.3.0
ListAzureBlobStorage 1.3.0
ListDatabaseTables 1.3.0
ListenBeats 1.3.0
ListenHTTP 1.3.0
ListenLumberjack 1.3.0
ListenRELP 1.3.0
ListenSMTP 1.3.0
ListenSyslog 1.3.0
ListenTCP 1.3.0
ListenUDP 1.3.0
ListenWebSocket 1.3.0
ListFile 1.3.0
ListFTP 1.3.0
ListGCSBucket 1.3.0
ListHDFS 1.3.0
ListS3 1.3.0
ListSFTP 1.3.0
LogAttribute 1.3.0
LogMessage 1.3.0
LookupAttribute 1.3.0
LookupRecord 1.3.0
MergeContent 1.3.0
ModifyBytes 1.3.0
ModifyHTMLElement 1.3.0
MonitorActivity 1.3.0
Notify 1.3.0
ParseCEF 1.3.0
ParseEvtx 1.3.0
ParseSyslog 1.3.0
PartitionRecord 1.3.0
PostHTTP 1.3.0
PublishAMQP 1.3.0
PublishJMS 1.3.0
PublishKafka 1.3.0
PublishKafka_0_10 1.3.0
PublishKafkaRecord_0_10 1.3.0
PublishMQTT 1.3.0
PutAzureBlobStorage 1.3.0
PutAzureEventHub 1.3.0
PutCassandraQL 1.3.0
PutCloudWatchMetric 1.3.0
PutCouchbaseKey 1.3.0
PutDatabaseRecord 1.3.0
PutDistributedMapCache 1.3.0
PutDynamoDB 1.3.0
PutElasticsearch 1.3.0
PutElasticsearch5 1.3.0
PutElasticsearchHttp 1.3.0
PutElasticsearchHttpRecord 1.3.0
PutEmail 1.3.0
PutFile 1.3.0
PutFTP 1.3.0
PutGCSObject 1.3.0
PutHBaseCell 1.3.0
PutHBaseJSON 1.3.0
PutHDFS 1.3.0
PutHiveQL 1.3.0
PutHiveStreaming 1.3.0
PutHTMLElement 1.3.0
PutIgniteCache 1.3.0
PutJMS 1.3.0
PutKafka 1.3.0
PutKinesisFirehose 1.3.0
PutKinesisStream 1.3.0
PutLambda 1.3.0
PutMongo 1.3.0
PutParquet 1.3.0
PutRiemann 1.3.0
PutS3Object 1.3.0
PutSFTP 1.3.0
PutSlack 1.3.0
PutSNS 1.3.0
PutSolrContentStream 1.3.0
PutSplunk 1.3.0
PutSQL 1.3.0
PutSQS 1.3.0
PutSyslog 1.3.0
PutTCP 1.3.0
PutUDP 1.3.0
PutWebSocket 1.3.0
QueryCassandra 1.3.0
QueryDatabaseTable 1.3.0
QueryDNS 1.3.0
QueryElasticsearchHttp 1.3.0
QueryRecord 1.3.0
QueryWhois 1.3.0
ReplaceText 1.3.0
ReplaceTextWithMapping 1.3.0
ResizeImage 1.3.0
RouteHL7 1.3.0
RouteOnAttribute 1.3.0
RouteOnContent 1.3.0
RouteText 1.3.0
ScanAttribute 1.3.0
ScanContent 1.3.0
ScrollElasticsearchHttp 1.3.0
SegmentContent 1.3.0
SelectHiveQL 1.3.0
SetSNMP 1.3.0
SplitAvro 1.3.0
SplitContent 1.3.0
SplitJson 1.3.0
SplitRecord 1.3.0
SplitText 1.3.0
SplitXml 1.3.0
SpringContextProcessor 1.3.0
StoreInKiteDataset 1.3.0
TailFile 1.3.0
TransformXml 1.3.0
UnpackContent 1.3.0
UpdateAttribute 1.3.0
UpdateCounter 1.3.0
UpdateRecord 1.3.0
ValidateCsv 1.3.0
ValidateXml 1.3.0
Wait 1.3.0
YandexTranslate 1.3.0
Controller Services
AvroReader 1.3.0
AvroRecordSetWriter 1.3.0
AvroSchemaRegistry 1.3.0
AWSCredentialsProviderControllerService 1.3.0
CouchbaseClusterService 1.3.0
CSVReader 1.3.0
CSVRecordSetWriter 1.3.0
DBCPConnectionPool 1.3.0
DistributedMapCacheClientService 1.3.0
DistributedMapCacheServer 1.3.0
DistributedSetCacheClientService 1.3.0
DistributedSetCacheServer 1.3.0
FreeFormTextRecordSetWriter 1.3.0
GCPCredentialsControllerService 1.3.0
GrokReader 1.3.0
HBase_1_1_2_ClientMapCacheService 1.3.0
HBase_1_1_2_ClientService 1.3.0
HiveConnectionPool 1.3.0
HortonworksSchemaRegistry 1.3.0
IPLookupService 1.3.0
JettyWebSocketClient 1.3.0
JettyWebSocketServer 1.3.0
JMSConnectionFactoryProvider 1.3.0
JsonPathReader 1.3.0
JsonRecordSetWriter 1.3.0
JsonTreeReader 1.3.0
PropertiesFileLookupService 1.3.0
ScriptedLookupService 1.3.0
ScriptedReader 1.3.0
ScriptedRecordSetWriter 1.3.0
SimpleCsvFileLookupService 1.3.0
SimpleKeyValueLookupService 1.3.0
StandardHttpContextMap 1.3.0
StandardSSLContextService 1.3.0
XMLFileLookupService 1.3.0
Reporting Tasks
AmbariReportingTask 1.3.0
ControllerStatusReportingTask 1.3.0
DataDogReportingTask 1.3.0
MonitorDiskUsage 1.3.0
MonitorMemory 1.3.0
ScriptedReportingTask 1.3.0
SiteToSiteBulletinReportingTask 1.3.0
SiteToSiteProvenanceReportingTask 1.3.0
SiteToSiteStatusReportingTask 1.3.0
StandardGangliaReporter 1.3.0
优雅的界面
就是我个人认为非常不错的界面,可以看到非常详细的数据流向。
因为NiFi可以对来自多种数据源的流数据进行处理,Hortonworks认为HDF平台非常适合用于物联网 (IoAT)的数据处理。HDF中的数据流动可以是多个方向,甚至是点对点的,用户可以同收集到的数据流进行交互,这种交互甚至可以延伸到数据源,比如一些传感器或是设备。按照Hortonworks公司的说法,HDF产品是对HDP产品的补充,前者主要处理移动中的数据,而后者基于Hadoop技术,主要负责从静止的数据中获取洞察。可以看一看Hortonworks官方宣传对HDF的定位,已经号称是端到端流数据处理分析。
Hortonworks DataFlow (HDF) provides the only end-to-end platform that collects, curates, analyzes and acts on data in real-time, on-premises or in the cloud, with a drag-and-drop visual interface. HDF is an integrated solution with Apache Nifi/MiNifi, Apache Kafka, Apache Storm and Druid.
上图是概要介绍HDF三大部分,The HDF streaming data analytics platform includes data Flow Management, Stream Processing, and Enterprise Services.Nifi是作为数据管理和接入,可以延伸部署到边缘网关的重要能力。
如果你的项目中也有同样的对多数据源的处理诉求,NiFi是个不错的选择。
为了方便大家相互交流学习,创建了一个公众号同名微信群:大数据和云计算技术交流群,欢迎大家加下面微信,我拉大家进群,自由交流。同时如果大家觉得对您有帮助,也欢迎大家打赏。:)