HttpWebRequestでデータを落としてきてる時に進捗を表示させたかった
ReactiveExtentionと拡張メソッドで何とかなるようです。Rxすごい
neue cc - Reactive Extensions用のWebRequest拡張メソッド
ここにある拡張メソッドの定義を適当なソースコードにごっそりコピーして、
var req1 = WebRequest.Create("http://www.microsoft.com/taiwan/silverlight/images/1920X1080_i.jpg") .DownloadDataAsyncWithProgress(10000) // 引数は分割サイズ指定、無指定時は64K .Do(p => Console.WriteLine("{0}/{1} - {2}%", p.CurrentLength, p.TotalLength, p.Percentage)) // 進捗表示 .Aggregate(new List<byte>(), (list, p) => { list.AddRange(p.Value); return list; }) // 分割されて届くbyte[]を一つにまとめる .Select(l => (Image)new ImageConverter().ConvertFrom(l.ToArray())) // バイト配列をImageに変換 .Subscribe(img => img.Save("C:\\test.jpg")); // 画像保存
と唱えると進捗を表示してくれるそうです。ほかはRxの標準メソッドなので、DownloadDataAsyncWithProgressの定義を見てみましょう。
public static IObservable<Progress<byte[]>> DownloadDataAsyncWithProgress(this WebRequest request, int chunkSize = 65536) { return Observable.Defer(() => request.GetResponseAsObservable()).SelectMany(r => r.DownloadDataAsyncWithProgress(chunkSize)); }
1行です。まったく、これだからLINQは…ほかの拡張メソッドも見てみましょう。
public static IObservable<Progress<byte[]>> DownloadDataAsyncWithProgress(this WebResponse response, int chunkSize = 65536) { return Observable.Defer(() => response.GetResponseStream().ReadAsync(chunkSize)) .Finally(() => response.Close()) .Scan(Progress.Create(new byte[0], 0, 0), (p, bytes) => Progress.Create(bytes, p.CurrentLength + bytes.Length, response.ContentLength)); }
ここが重要そうですね。チャンクのサイズでストリームを読み取り、Progressクラスに進捗状況をまとめているようです。それにしても、Scanは標準でほしいですね。
public static IObservable<WebResponse> GetResponseAsObservable(this WebRequest request) { return ObservableForCompatible.Create<WebResponse>(observer => { var disposable = new BooleanDisposable(); Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, ar => { var res = request.EndGetResponse(ar); if (disposable.IsDisposed) res.Close(); return res; })().Subscribe(observer); return disposable; }); }
こちらはObservableな世界に乗り込めるようにするための拡張メソッドのようです。メソッド全体ではIObservable
ちなみにこのライブラリ(?)、ちょっと手直しが必要です
実は警告が出ます。なぜかというと、非同期系の処理ではBegin~とEnd~のセットじゃなくて、~Asyncを使いましょうというところに向かっているからです。
neue cc - Reactive Extensions v1.1.11011.11リリースに見る.NET 4.5からの非同期処理FromAsyncPatternがObsolete
はい、Obsoleteです。理由としては、.NET 4.5では多くのメソッドがBegin-Endパターンの代わりにTaskを返すXxxAsyncメソッドを持っています。そしてTaskとIObservableは相互に変換可能だから、Rxで扱いたいならXxxAsync().ToObservableすればいいでしょ、ということでした。Begin-EndパターンなのにXxxAsyncを持っていないメソッドにはどうするんだ!という場合は、TaskFactory.FromAsyncがあるので、それ使えばいい、とのこと。まあ、それはレアケースなので滅多にないかな。
なので、上の3つ目「GetResponseAsObservable」をこう書き換えてみました。
public static IObservable<WebResponse> GetResponseAsObservable(this WebRequest request) { return ObservableForCompatible.Create<WebResponse>(observer => { var disposable = new BooleanDisposable(); request.GetResponseAsync().ToObservable(). Select(s => { if (disposable.IsDisposed) s.Close(); return s; }). Subscribe(observer); return disposable; }); }
ほかの箇所も同じように書き換えると警告はなくなります。幸い数はそれほど多くないので、サクッと書き換えられます。挙動を考えればこれでいいはず…