[Azure]_使用Azure Event Hubs – 打造即時log紀錄功能
本文翻譯&整理&擴充專案做法原文來自:http://fabriccontroller.net/blog/posts/getting-started-azure-service-bus-event-hubs-building-a-real-time-log-stream/
Azure服務系統架構圖來自:http://msdn.microsoft.com/en-us/library/azure/dn836025.aspx
Picasa圖床怪怪的,圖片變得不是很清楚,要看大圖請到:https://picasaweb.google.com/112900880353798406790/AZUREEVENTHUBSAMPLE1PIC02
在2014 WPC,微軟發表Azure Event Hubs 的服務,這是Azure Service Bus的一部分。
Event Hubs很適合給IoT以及資料採集處理的情境使用。
而為何需要Event Hub?,當你有大量的client回傳資料時,可以用Event Hub當作這些大量Client的Buffer,交給Azure接收之後,後端再去消化讀取處理,減少大量Client送資料而主機無法負荷的情況產生。
並且也適合SQL SERVER與CLIENT身處不同網域,但因為資料保全政策不能夠允許CLIENT跟目標的SQL SERVER直接連線。這時也是可以運用Event Hub來當中繼轉接。
而Event Hub有個資料儲存機制稱之為”Partition”,一個Event Hub在建立的時候需要指定你的Partition需要多少個。
- 建立完之後就不能再改,所以請確定你的應用情境後端的資料平行化數量需求為多少,最大為32(要更大可以跟ms聯絡)
- 資料在Partition存活的時間也可以調整,不過你不能指定去刪除他,唯有存活時間到或者讀取出才會被刪除。
而Partition可以視為多個Queue,運作概念很像,但資料進入的優先權處裡的方式略有不同,Service Bus底下的Queue跟Topics運作方式為Competing Consumers Pattern
也就是說在Queue out取的資料的那端,若有多個程序需要存取相同Queue的內容,就會增加處理資料程序的複雜度。
而Event Hub採取 Partitioned consumer pattern ,也就是擁有多個Queue,然後平均分散地把資料丟到不同的Partition(當然,也可以指定資料要進去哪個Partition,不過當然是不建議這樣)。
這樣就能達成後端拿取資料的程序能夠平行化的處理以及取得大量資料。
對Partition有特別指定要怎麼去指派,以及如何讓多個Consumer拿取指定的Partition的概念方式可以參考MSDN說明:
http://msdn.microsoft.com/en-us/library/azure/dn836025.aspx
在Consumer groups的區塊,會告訴你大略如何新增更多後端處理資料的Consumer
在Stream Offsets的區塊,會告訴你在Partition裡面的資料除了FIFO,還可以指定Offsets去取得其他時間點的資料。
另外需要特別注意的是單筆資料(MSDN上面稱作Event Data)大小不可超過256kb,
而一個運輸單位(可以在EventHub的Scale頁面調整,最大20個,當然…不夠就要聯絡ms),流入流量最大每秒1MB 或者每秒1000筆事件,流出流量最大為每秒2MB。
/////上面基礎知識講完了,下面正題////
原文的範例是要利用asp .net mvc 5建造一個網站,其中擁有一個可以收集連線者的連線資訊(環境,client端的作業系統及瀏覽器資訊),並且把這些資訊回傳至指定的Event Hub。
除了傳送至Event Hub,也有一個簡單利用C#寫的CONSOLE程式能夠從Event Hub拉資料下來檢視(當然也可以進一步的做處理)
再開始之前,先定義幾個名詞:
- 傳送的Client(在MSDN下面稱作Event Publisher),例如這個範例是由asp網站寫的TraceListener,每當有Client連線到該網站,Client的瀏覽器就會傳一份資料給Event Hub,而在SAP(Share Access Policies)底下的權限稱作”Send”,而在IoT的情境下這些傳送的Client想當然就是一堆Sensor了。
- 接受資料的對象(本文代稱Processor,在MSDN下面稱作Event Consumer),就是從Event Hub拉資料下來的腳色,大部分的情況是一隻程式,可以處理Event Hub所QUEUE住的資料,或者DATA MINING的程式。
注意,這兩者之間是有夾著Event Hub的,資料路徑務必釐清。
Client->Event Hub->Processor
接下來要為等一下的SAMPLE建置AZURE環境:
▼ 看圖說故事,1.左邊的Service Bus,2.新增,3.填妥相關資訊,注意Type是為Messaging,4.建立新的Service BUS
▼1.對新的Service Bus,2.點選Event Hub,3.選建立新的EventHub,4.自訂建立
▼依照本次範例,請注意Event Hub Name,需要符合等下專案的名稱。
▼接下來這個步驟也很重要,前面提到PARTITION COUNT在建立完之後不能變更,需要注意,另外MESSAGE RETENTION代表訊息會在Partition Queue裡面存活多久,預設是1Day
▼建立好之後,1.按下剛剛建立好的Event Hub “log”,2.按下Configure,3.要設定SAP,新增LogSender跟LogProcessor,型態分別為Send跟Listen,這兩個名稱分別代表Publisher以及Consumer使用的存取規則,也會對應著等下sample裡面的code。
▼下方的SAKG,可以看到藍色箭頭現在是LogSender,而對應著一組Key,LogProcessor也會對應一組Key,請按下綠色箭頭去切換,對應的這兩組Key等下會用到。
接著就是跑sample的部分,可以參考原文作者的GitHub:
https://github.com/sandrinodimattia/EventHubsDemo
整個專案抓下來後,可以看到傳送瀏覽網頁的使用者的資訊到EventHub所使用的語法class在EventHubsTraceListener.cs底下,原文作者已經有針對這個Class做解說
public EventHubsTraceListener() { MessagingFactory factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", ConfigurationManager.AppSettings["ServiceBus.Namespace"], ""), new MessagingFactorySettings() { TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(ConfigurationManager.AppSettings["ServiceBus.KeyName"], ConfigurationManager.AppSettings["ServiceBus.Key"]), TransportType = TransportType.Amqp }); _client = factory.CreateEventHubClient("Logs"); // Event information. _instanceId = Environment.GetEnvironmentVariable("WEBSITE_INSTANCE_ID") ?? DateTime.Now.Ticks.ToString(CultureInfo.InvariantCulture); _siteName = Environment.GetEnvironmentVariable("WEBSITE_SITE_NAME") ?? "CorporateWebApp"; }
比較需要注意的是它裡面用了許多的ConfigurationManager去儲存一些連線EventHub用的資訊,例如Event Hub Name跟Key等等…
關於ConfigurationManager的詳細內容可以參考:
http://www.cnblogs.com/LoveJenny/archive/2011/07/11/2103419.html
另外這裡因為是ASP.NET, 所以可以使用Service BUS的SDK,所以這裡的傳送協定採用AMQP,不過Publisher可以使用HTTP跟AMQP協定,但在更換的時候必須要注意:MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri(“https“, ConfigurationManager.AppSettings[“ServiceBus.Namespace”], “”)…(下略)
TransportType = TransportType.Amqp
那麼要怎麼填入自己的Event Hub資訊呢?
因為透過ConfigurationManager,所以她是放在Web.config裡面
<appSettings> <add key="webpages:Version" value="3.0.0.0" /> <add key="webpages:Enabled" value="false" /> <add key="ClientValidationEnabled" value="true" /> <add key="UnobtrusiveJavaScriptEnabled" value="true" /> <add key="ServiceBus.Namespace" value="DXRDAA02"/> <add key="ServiceBus.KeyName" value="LogSender"/> <add key="ServiceBus.Key" value="23l20ryyTax6PcrQMrKaviJnIF+nkSyG93bHRrV2RF8="/> </appSettings>
解說一下,會需要改的只有倒數三行
- ServiceBus.Namespace,這個是填入你ServiceBus的開頭名稱
- ServiceBus.KeyName,因為這個網站是擔任將瀏覽端的資訊Send給Event Hub, 因此就算Publisher的腳色
- ServiceBus.Key,對應剛剛Azure頁面LogSender的Key填入即可。
然後發行到你的IIS Server,或者部屬到Azure WebSite都可以。
接著要來看同方案底下的另外一個專案:EventHubsDemo.Processor
private static void Main(string[] args) { var cts = new CancellationTokenSource(); for (int i = 0; i <= 7; i++) { Task.Factory.StartNew((state) => { Console.WriteLine("Starting worker to process partition: {0}", state); var factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", ConfigurationManager.AppSettings["ServiceBus.Namespace"], ""), new MessagingFactorySettings() { TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(ConfigurationManager.AppSettings["ServiceBus.KeyName"], ConfigurationManager.AppSettings["ServiceBus.Key"]), TransportType = TransportType.Amqp }); var client = factory.CreateEventHubClient("Logs"); var group = client.GetDefaultConsumerGroup(); Console.WriteLine("Group: {0}", group.GroupName); var receiver = group.CreateReceiver(state.ToString(), DateTime.UtcNow); while (true) { if (cts.IsCancellationRequested) { receiver.Close(); break; } // Receive could fail, I would need a retry policy etc... var messages = receiver.Receive(10); foreach (var message in messages) { var logMessage = Newtonsoft.Json.JsonConvert.DeserializeObject<LogMessageEvent>(Encoding.Default.GetString(message.GetBytes())); Console.WriteLine("{0} [{6}] {2}/{3}: {5}", DateTime.Now, message.PartitionKey, logMessage.MachineName, logMessage.SiteName, logMessage.InstanceId, logMessage.Value, state); Console.WriteLine(" > Instance/PartitionKey: {0}", message.PartitionKey); } } }, i); } Console.ReadLine(); cts.Cancel(); }
主體是一個for迴圈,創立8個task來達到平行化處理(或者模擬多個後端處理程序),這裡會說是8個其實就是對應前面說有8個Partition。
可以看到在Event Consumer這邊,必須要走AMQP協定來拿資料。
TransportType = TransportType.Amqp
如果你想要區別多個WorkLoad Group,可以在Azure設定完之後在這裡指定Group的Name,這邊會是預設這個Event Hub只有一個WorkLoad Group且自動的幫你安排資料進出的Partition.
var group = client.GetDefaultConsumerGroup();
另外一樣,這段code的連線資料都放在App.config底下
<appSettings> <add key="ServiceBus.Namespace" value="DXRDAA01"/> <add key="ServiceBus.KeyName" value="LogProcessor"/> <add key="ServiceBus.Key" value="04w5CZ1M+v7Hjy076vt2klUJzzo8Ujj3NlfB1JG7zN0="/> </appSettings>
這三個Key-value剛剛解釋過了,比較要改的是KeyName跟對應的key
KeyName因為這邊是接收的Consumer,所以這裡要改成剛剛在Azure設定SAP的LogProcessor這條規則。
接著要改Share裡面的App.config
<appSettings> <!-- Service Bus specific app setings for messaging connections --> <!--<add key="Microsoft.ServiceBus.ConnectionString" value="Endpoint=sb://[your namespace].servicebus.windows.net;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[your secret]" />--> <add key="Microsoft.ServiceBus.ConnectionString" value="Endpoint=sb://dxrdaa01.servicebus.windows.net;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=hq7oGVxZo58BEzr4JtnOFE6PLm+2hSQJzRi0Clg36Z8=" /> </appSettings>
這就是你ServiceBus的key,我把原本在Project內的註解留下來對應。
▼在這個地方拿到你的Service Bus Key
▼ 但其實這個頁面可以拿到整個Endpoint,就不用手改了
儲存後,網站上線ok,然後就可以開啟這個Processor,當有人瀏覽你的網頁時,就會有資料從Event Hub取得顯示出來
就完成這整個sample的deploy~
若想要知道從零BUILD起,可以參考微軟說明文章,裡面有幾個比較重要的CODE會出現(本文所用的範例也都有包括該文的內容)
http://msdn.microsoft.com/zh-tw/library/azure/dn789972.aspx
Leave a comment
很抱歉,必須登入網站才能發佈留言。