読者です 読者をやめる 読者になる 読者になる

ゆうなんとかさんの雑記帳的な。

Twitterで踊ったり音ゲーしたりしてるあの名前がよくわからない人が書いてるらしいよ。

HttpWebRequestでデータを落としてきてる時に進捗を表示させたかった

ReactiveExtentionと拡張メソッドで何とかなるようです。Rxすごい
neue cc - Reactive Extensions用のWebRequest拡張メソッド
ここにある拡張メソッドの定義を適当なソースコードにごっそりコピーして、

var req1 = WebRequest.Create("http://www.microsoft.com/taiwan/silverlight/images/1920X1080_i.jpg")
    .DownloadDataAsyncWithProgress(10000) // 引数は分割サイズ指定、無指定時は64K
    .Do(p => Console.WriteLine("{0}/{1} - {2}%", p.CurrentLength, p.TotalLength, p.Percentage)) // 進捗表示
    .Aggregate(new List<byte>(), (list, p) => { list.AddRange(p.Value); return list; }) // 分割されて届くbyte[]を一つにまとめる
    .Select(l => (Image)new ImageConverter().ConvertFrom(l.ToArray())) // バイト配列をImageに変換
    .Subscribe(img => img.Save("C:\\test.jpg")); // 画像保存

と唱えると進捗を表示してくれるそうです。ほかはRxの標準メソッドなので、DownloadDataAsyncWithProgressの定義を見てみましょう。

public static IObservable<Progress<byte[]>> DownloadDataAsyncWithProgress(this WebRequest request, int chunkSize = 65536)
{
   return Observable.Defer(() => request.GetResponseAsObservable()).SelectMany(r => r.DownloadDataAsyncWithProgress(chunkSize));
}

1行です。まったく、これだからLINQは…ほかの拡張メソッドも見てみましょう。

public static IObservable<Progress<byte[]>> DownloadDataAsyncWithProgress(this WebResponse response, int chunkSize = 65536)
{
   return Observable.Defer(() => response.GetResponseStream().ReadAsync(chunkSize))
       .Finally(() => response.Close())
       .Scan(Progress.Create(new byte[0], 0, 0),
             (p, bytes) => Progress.Create(bytes, p.CurrentLength + bytes.Length, response.ContentLength));
}

ここが重要そうですね。チャンクのサイズでストリームを読み取り、Progressクラスに進捗状況をまとめているようです。それにしても、Scanは標準でほしいですね。

public static IObservable<WebResponse> GetResponseAsObservable(this WebRequest request)
{
    return ObservableForCompatible.Create<WebResponse>(observer =>
    {
        var disposable = new BooleanDisposable();
 
        Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, ar =>
        {
            var res = request.EndGetResponse(ar);
            if (disposable.IsDisposed) res.Close();
            return res;
        })().Subscribe(observer);
 
        return disposable;
    });
}

こちらはObservableな世界に乗り込めるようにするための拡張メソッドのようです。メソッド全体ではIObservableを返すんですが、ラムダ式のreturnが超気持ち悪い。。。きっとCreateでRx側がいい感じに値をまとめてるんでしょうね…

ちなみにこのライブラリ(?)、ちょっと手直しが必要です

実は警告が出ます。なぜかというと、非同期系の処理ではBegin~とEnd~のセットじゃなくて、~Asyncを使いましょうというところに向かっているからです。

FromAsyncPatternがObsolete

はい、Obsoleteです。理由としては、.NET 4.5では多くのメソッドがBegin-Endパターンの代わりにTaskを返すXxxAsyncメソッドを持っています。そしてTaskとIObservableは相互に変換可能だから、Rxで扱いたいならXxxAsync().ToObservableすればいいでしょ、ということでした。Begin-EndパターンなのにXxxAsyncを持っていないメソッドにはどうするんだ!という場合は、TaskFactory.FromAsyncがあるので、それ使えばいい、とのこと。まあ、それはレアケースなので滅多にないかな。

neue cc - Reactive Extensions v1.1.11011.11リリースに見る.NET 4.5からの非同期処理

なので、上の3つ目「GetResponseAsObservable」をこう書き換えてみました。

public static IObservable<WebResponse> GetResponseAsObservable(this WebRequest request)
{
    return ObservableForCompatible.Create<WebResponse>(observer =>
    {
        var disposable = new BooleanDisposable();

        request.GetResponseAsync().ToObservable().
            Select(s => { if (disposable.IsDisposed) s.Close(); return s; }).
            Subscribe(observer);

        return disposable;
    });
}

ほかの箇所も同じように書き換えると警告はなくなります。幸い数はそれほど多くないので、サクッと書き換えられます。挙動を考えればこれでいいはず…