使用LINQ運算符查詢可觀察序列


  在與現有.NET事件橋接時,我們已將現有.NET事件轉換為可觀察序列以訂閱它們。 在本主題中,我們將觀察可觀察序列的一類本質作為IObservable 對象,其中通用的LINQ操作符由Rx程序集提供以操作這些對象。 大多數運算符采用可觀察序列並對其執行一些邏輯並輸出另一個可觀察序列。 此外,從代碼示例中可以看出,您甚至可以在源序列上鏈接多個運算符,以根據您的確切要求調整結果序列。


使用不同的運算符


  我們已經使用瞭前面主題中的Create和Generate運算符來創建和返回簡單序列。 我們還使用FromEventPattern運算符將現有的.NET事件轉換為可觀察的序列。 在本主題中,我們將使用Observable類型的其他靜態LINQ運算符,以便可以對數據進行過濾,分組和轉換。 這樣的算子將可觀察序列作為輸入,並產生可觀測序列作為輸出。


組合不同的序列


  在本節中,我們將研究一些將各種可觀察序列組合成單個可觀察序列的運算符。 請註意,當我們組合序列時,數據不會被轉換。

  在下面的示例中,我們使用Concat運算符將兩個序列組合成一個序列並訂閱。 為瞭說明的目的,我們將使用非常簡單的Range(x,y)運算符來創建一個以x開頭的整數序列,然後生成y個序列號。


var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Concat(source2)
.Subscribe(Console.WriteLine);
Console.ReadLine();

  

  註意,結果序列是1,2,3,1,2,3。這是因為當您使用Concat運算符時,第二個序列(source2)直到第一個序列(source1)完成推送其所有值後才會被激活。隻有在source1完成後,source2才會開始將值推送到結果序列。然後,訂戶將從結果序列中獲得所有值。

  將此與合並運算符進行比較。如果運行以下示例代碼,您將獲得1,1,2,2,3,3。這是因為兩個序列同時是活動的,並且值在源中發生時被推出。結果序列僅在最後一個源序列完成推送值時完成。

註意,為瞭合並工作,所有源可觀察序列需要具有相同類型的IObservable 。所得到的序列將具有類型IObservable 。如果source1在序列中間產生一個OnError,那麼結果序列將立即完成。


var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Merge(source2)
.Subscribe(Console.WriteLine);
Console.ReadLine();

可以使用Catch運算符進行另一個比較。 在這種情況下,如果source1完成沒有任何錯誤,則source2將不會啟動。 因此,如果運行以下示例代碼,您將隻得到1,2,3,因為source2(它產生4,5,6)被忽略。


var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(4, 3);
source1.Catch(source2)
.Subscribe(Console.WriteLine);
Console.ReadLine();

  最後,讓我們來看看OnErrorResumeNext。 此操作符將移動到source2,即使source1由於錯誤而無法完成。 在以下示例中,即使source1表示以異常(通過使用Throw運算符)終止的序列,訂閱方將接收由source2發佈的值(1,2,3)。 因此,如果您期望源序列產生任何錯誤,則更安全的是使用OnErrorResumeNext來保證訂戶仍然會收到一些值。


var source1 = Observable.Throw<int>(new Exception("An error has occurred."));
var source2 = Observable.Range(4, 3);
source1.OnErrorResumeNext(source2)
.Subscribe(Console.WriteLine);
Console.ReadLine();

註意,對於所有這些組合運算符工作,所有可觀察的序列需要是相同類型的T.


投影


Select操作符可以將可觀察序列的每個元素轉換為另一種形式。

在下面的例子中,我們分別將整數序列投影成長度為n的字符串。


var seqNum = Observable.Range(1, 5);
var seqString = from n in seqNum
select new string('*', (int)n);
seqString.Subscribe(str => { Console.WriteLine(str); });
Console.ReadKey();

在下面的示例中,它是.NET事件轉換示例的擴展,我們在“使用現有.NET事件橋接”主題中看到,我們使用選擇運算符將IEventPattern 數據類型投影到Point類型中。 這樣,我們將鼠標移動事件序列轉換為可以進一步解析和處理的數據類型,如下一個“過濾”部分所示。


var frm = new Form();
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
select evt.EventArgs.Location;
points.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);

  最後,讓我們看看SelectMany運算符。 SelectMany運算符有很多重載,其中一個接受選擇器函數參數。這個選擇器函數在由source observable推出的每個值上調用。對於這些值中的每一個,選擇器將它投射到迷你可觀察序列中。最後,SelectMany操作符將所有這些迷你序列平坦化為單個結果序列,然後將其推送到訂閱者。

  SelectMany返回的observable在源序列之後發佈OnCompleted,並且選擇器生成的所有mini observable序列都已完成。當源流中發生錯誤時,選擇器函數拋出異常時,或者在任何mini observable序列中發生錯誤時,將觸發OnError。

  在下面的示例中,我們首先創建一個源序列,每5秒產生一個整數,然後決定采用生成的前2個值(通過使用Take運算符)。然後,我們使用SelectMany來使用另一個{100,101,102}序列來投影這些整數。通過這樣做,產生兩個小的可觀察序列,{100,101,102}和{100,101,102}。這些最終被平坦化成{100,101,102,100,101,102}的單個整數流,並被推送到觀察者。


var source1 = Observable.Interval(TimeSpan.FromSeconds(5)).Take(2);
var proj = Observable.Range(100, 3);
var resultSeq = source1.SelectMany(proj);

var sub = resultSeq.Subscribe(x => Console.WriteLine("OnNext : {0}", x.ToString()),
ex => Console.WriteLine("Error : {0}", ex.ToString()),
() => Console.WriteLine("Completed"));
Console.ReadKey();

過濾


在下面的示例中,我們使用主要的運算符創建一個簡單的可觀察數字序列。 主要的運算符有幾個重載。 在我們的示例中,它采用初始狀態(在我們的示例中為0),終止的條件函數(少於10次),迭代器(+1),結果選擇器(當前值的平方函數)。 ,並使用Where和Select運算符僅打印小於15的值。


IObservable<int> seq = Observable.Generate(0, i => i < 10, i => i + 1, i => i * i);
IObservable<int> source = from n in seq
where n < 5
select n;
source.Subscribe(x => {Console.WriteLine(x);}); // output is 0, 1, 4, 9
Console.ReadKey();

以下示例是您在本主題前面看到的投影示例的擴展。 在該示例中,我們使用Select運算符將IEventPattern 數據類型投影到Point類型中。 在下面的示例中,我們使用Where和Select運算符僅選擇我們感興趣的那些鼠標移動。 在這種情況下,我們將鼠標移動到第一平分線上(其中x和y坐標相等)。


var frm = new Form();
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
select evt.EventArgs.Location;
var overfirstbisector = from pos in points
where pos.X == pos.Y
select pos;
var movesub = overfirstbisector.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);

基於時間的操作


您可以使用Buffer運算符來執行基於時間的操作。

緩沖可觀察序列意味著可觀察序列的值基於指定的時間跨度或計數閾值被放入緩沖器中。這在您期望大量數據被序列推出並且訂戶沒有用於處理這些值的資源的情況下尤其有用。通過基於時間或計數緩沖結果,並且當超過標準時(或當源序列已經完成時),僅返回值序列,訂戶可以以其自己的速度處理OnNext調用。

在下面的示例中,我們首先為每秒創建一個簡單的整數序列。然後,我們使用Buffer運算符並指定每個緩沖區將保存來自序列的5個項目。當緩沖區已滿時,調用OnNext。然後,我們使用Sum運算符計算緩沖區的總和。緩沖區將自動刷新,並開始另一個循環。打印輸出將為10,35,60 …其中10 = 0 + 1 + 2 + 3 + 4,35 = 5 + 6 + 7 + 8 + 9,依此類推。


var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(5);
bufSeq.Subscribe(values => Console.WriteLine(values.Sum()));
Console.ReadKey();

我們還可以創建一個具有指定時間跨度的緩沖區。 在以下示例中,緩沖區將保存已累積3秒鐘的項目。 打印輸出將為3,12,21 …其中3 = 0 + 1 + 2,12 = 3 + 4 + 5,依此類推。


var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(TimeSpan.FromSeconds(3));
bufSeq.Subscribe(value => Console.WriteLine(value.Sum()));
Console.ReadKey();

註意,如果你使用Buffer或Window,你必須確保序列在過濾之前不為空。


LINQ操作符


LINQ Operators by Categories主題列出瞭Observable類型按其類別實現的所有主要LINQ操作符; 具體地:創建,轉換,組合,功能,數學,時間,異常,雜項,選擇和原語。

0 個評論

要回覆文章請先登錄註冊