Rööpülesannete katkestamine

Rööpülesanded jooksevad üksteisest sõltumatult. Kui ühes ülesandes visatakse erand, ei takista see teistel ülesannetel edasi jooksmast. Erandid kogutakse kokku ja jõuavad pealõime.

Vaata seda näidet ühest kasutust programmist, kus iga lõim viskab poole täitmise pealt erandi:

static void Do()
{
  for (int i = 0; i < 100; i++)
    if (i >= 50)
      throw new OperationCanceledException();
}

static void Main(string[] args)
{
  try
  {
    Parallel.Invoke(Do, Do, Do);
  }
  catch (AggregateException x)
  {
    Console.WriteLine(x.InnerExceptions.OfType<OperationCanceledException>().Count() + " tasks canceled");
  }
}

Kõigi lõimede erandid kogutakse pealõime erandisse AggregateException. Ja selle InnerExceptions sisaldab meie näites täpselt sama palju erandeid nagu oli lõimesid. See tähendab, et iga lõim tegi oma tööd teistest sõltumatult ja jõudis oma tööga erandini.

Mida teha siis, kui üks lõim peaks saama teiste tegevuse katkestada? Näiteks, kui töös selgub viga, mis teeb mõttetuks kõigi teiste lõimede edasised pingutused. Teine variant on see, et kasutajal on “Loobu” nupp, millel klõpsates ta saab töö katkestada.

Siin tuleb appi CancellationTokenSource, mis on lõimeturvaline, spetsiaalselt selleks otstarbeks loodud objekt.

Meie järgmises näiteks on meil üldkasutatav muutuja

static CancellationTokenSource ts = new CancellationTokenSource();

ja igaüks (suvaline lõim või ka näiteks kasutaja, kes vajutab mingit nuppu) võib välja kutsuda ts.Cancel().

Ülesandeid luues on vaja kaasa anda Token:

Task.WaitAll
(
  Task.Factory.StartNew(Do, ts.Token),
  Task.Factory.StartNew(Do, ts.Token),
  Task.Factory.StartNew(Do, ts.Token)
);

Protseduur Do kasutab spetsiaalset meetodit ThrowIfCancellationRequested, mis kontrollib, kas ülesanded ei ole globaalselt katkestatud ja viskab erandi kui on.

static void Do()
{
  for (int i = 0; i < 100; i++)
  {
    ts.Token.ThrowIfCancellationRequested();
    if (i >= 50)
      ts.Cancel();
  }
}

Ülaltoodud näite tulemuseks on, et kõik ülesanded jõuavad katkestatud seisu, ent need katkestatakse hetkel, kui ükskõik milline lõimedest jõuab poole peale.

Mõned märkused

ThrowIfCancellationRequested vajab ressursse ja reaalelus ei oleks mõttekas tsükli igas sammus seda välja kutsuda. Näiteks kui tegemist on pikkade ülesannetega ja kasutaja vajutab katkestusnuppu, piisaks sellest, kui katkestamiseks kuluks üks sekund. Iga lõim peaks siis sel juhul kontrollima katkestust kord sekundis.

Teine asi on see, et ThrowIfCancellationRequested poolt visatavat erandit ei tohiks töödelda ülesandesiseselt, muidu see info ei jõua pealõimeni. Pealõimel on sellest kasu, sest Task-objekti Status seatakse sel juhul seisu TaskStatus.Canceled, mitte TaskStatus.Faulted nagu muude erandite puhul. Sel eesmärgil edastatakse Token ka Task.Factory.StartNew-meetodile.

Categories: Uncategorized

Erinevate ülesannete rööpkäivitamine

Seni oleme vaadanud, kuidas andmeid rööpselt töödelda. Aga tegelikult saab rööpselt käivitada täiesti erinevaid ülesandeid.

Toome näite: programmi käivitamisel on vaja internetist alla laadida 3 faili. Esimene asi, mis pähe kargab, on teha seda asünkroonselt WebClient.DownloadStringAsync abil. Pole paha mõte. Iga DownloadStringCompleted sündmus kontrollib, kas kõik failid on juba alla tulnud või on mõni veel puudu.

Parallel.Invoke()

Rööplemine on siin palju lihtsam:

string a = "", b = "", c = "";
      
Parallel.Invoke
(
  () => { a = new WebClient().DownloadString("http://1.com&quot;); },
  () => { b = new WebClient().DownloadString("http://2.com&quot;); },
  () => { c = new WebClient().DownloadString("http://3.com&quot;); }
);

Parallel.Invoke() võtab nii palju funktsioone, kui sa tahad anda, ja lõpetab siis, kui kõik need on valmis saanud. Antud juhtumil lõpetatakse siis pärast kõigi kolme stringi allalaadimist. Lihtne, kas pole?

Task.Factory

Teine tore loom on Task.Factory, mis võimaldab luua ülesandeid ühekaupa ja siis nende täitmist ootama jääda.

List<Task> tasks = new List<Task>();
tasks.Add(Task.Factory.StartNew(() => { a = new WebClient().DownloadString("http://1.com&quot;); }));
tasks.Add(Task.Factory.StartNew(() => { b = new WebClient().DownloadString("http://2.com&quot;); }));
tasks.Add(Task.Factory.StartNew(() => { c = new WebClient().DownloadString("http://3.com&quot;); }));
Task.WaitAny(tasks.ToArray(), TimeSpan.FromSeconds(5));

Task.Factory.StartNew() tagastab viivitamatult Task-objekti, käivitades ülesande taustal.

Task.WaitAll() võtab suvalise arvu ülesandeid (Task-objekte) ja ootab, kuni kõik on täidetud. Lisatingimusena võib anda ajalõpu limiidi.

Teine sarnane meetod on Task.WaitAny(), mis ootab, kuni ükskõik milline ülesannetest on täidetud. Selle tagastudes on tavaliselt täitunud ainult üks ülesannetest.

Jep, rööplemine on tõesti mõnus. Üldiselt ei ole see hädavajalik, sest teiste meetoditega võib saavutada samu tulemusi, kuid see on lihtne kasutada ja loogiliselt üles ehitatud. Ja loomulikult on selline kood palju paremini loetav.

Categories: Uncategorized

Rööpülesannete algväärtustamine

Rööplemine toimub eraldi lõimedes, ja juhtumitel, kui on tarvis ligi pääseda objektile, mis ei ole lõimeturvaline, peaks selle objekti lukustama, aga see võib viia olukorda, kus rööplemisest ei ole mingit tolku, kuna kõik lõimed istuvad ikka ühes järjekorras.

Siin tuleb abiks rööplõimede algväärtustamine, kus igale ülesandele saab luua oma objektid.

Kõige lihtsam näide, kus seda võib vaja minna, on juhuarvude generaator, mis ei ole lõimeturvaline.

double[] prices = new double[10000];

Parallel.For<Random>
(
  0,
  prices.Length,
  () => new Random(),
  (i, state, random) => { prices[i] = random.NextDouble(); return random; },
  random => { }
);

Parallel.For() võtab lisaks miinimum- ja maksimumväärtusele ka kolm funktsiooni.

Esimene käivitatakse iga ülesande alguses, teine iga tsükli jaoks ja kolmas ülesannet lõpetades.

Esimeses funktsioonis me loome Random-objekti (tõele au andes tuleks tunnistada, et see näide on liiga lihtsustatud, sest parema tulemuse jaoks peaks arvestama, et iga Random-objekt vajab erinevat seemet, kuna need luuakse peaaegu üheaegselt).

Teine funktsioon saab positsiooni, staatuse objekti ja ülesande jaoks algväärtustamisel loodud Random-objekti. Seda kasutatakse järgmise juhuarvu genereerimiseks ja massiivi salvestamiseks. Funktsioon peab tagastama järgmises tsüklis kasutatava objekti (meie juhtumil ei ole tarvis seda muuta).

Kolmas funktsioon oleks kasulik, kui Random-objekt vajaks lõpetuseks mingit protseduuri. Meie seda antud näites ei vaja.

Aggregate()

See on tore meetod, mida saab kasutada oma koondfunktsioonide loomiseks, kui sisseehitatutest ei piisa. Kujutan ette, et see kulub marjaks ära laenukalkulaatorite ja statistikaülesannete jaoks. Aga meie teeme midagi väga tavalist: võtame kõigi väärtuste keskmise.

double d = prices.AsParallel().Aggregate
(
  () => new double[2],
  (accum, element) => { accum[0] += element; accum[1]++; return accum; },
  (accumA, accumB) => { accumA[0] += accumB[0]; accumA[1] += accumB[1]; return accumA; },
  accum => accum[0] / accum[1]
);

Seda protseduuri ei saa teostada ülesannet algväärtustamata, sest me ei saa kõigile lõimedele anda ühiskasutatavat double[2]-muutujat. Niisiis vajab iga ülesanne oma muutujat.

Esimene funktsioon loob kohaliku muutuja (millele edaspidi viidatakse nimega accum).

Teine funktsioon on tsükli käitamiseks. Masiivi 0-positsioon on väärtuste summa ja 1-positsioon on väärtuste koguarv. Nagu For-meetodis, tuleb ka siin tagastada muutuja ise. Funktsiooni teine parameeter, element, on loomulikult tsüklis töödeldav sisendväärtus.

Kolmas funktsioon on see, mis käivitatakse kõiki erinevaid lõimesid kokku võttes. See koondab kõigi lõimede tulemused, antud juhul summeerib kõik väärtused, võttes kaks sisendparameetrit ja tagastades summa.

Neljas funktsioon käivitatakse üks kord: siis kui kõik lõimed on summeeritud. Siin me jagame summa väärtuste koguarvuga ja saamegi keskmise.

Kolm varianti

Selle ülesande saaks loomulikult teostada palju lihtsamalt: prices.Average() jadana ja prices.AsParallel().Average() rööpes.

Aga ma loodan, et see näide aitas mõista, mida on vaja, et teostada funktsiooni, mida standardvalikus ei leidu.

Muuseas, nagu võib arvata, ei tarvitse jadas ja rööpes teostatud koondfunktsioonide tulemused täielikult ühtida, sest rööpes tehakse kõik tööd kahes astmes (iga lõime jaoks eraldi ja seejärel koondatuna), mis annab erineva vea kui jadaülesandes.

Minu katse puhul tuli minu enda koodijupi ja AsParallel().Average() vastus sama, kuid jadaprogrammi vastus viimase numbrikoha osas pisut erinev:

0.497030457169437
0.497030457169437
0.497030457169439

See ongi oodatud tulemus, mis näitab, et rööpkoondfunktsioonid töötavad sisemiselt kaheastmelistena.

Categories: Uncategorized

Koondfunktsioonid ja pudelikael

LINQ koos oma koondfunktsioonidega on suurepärane asi. Rööpes on sama asi saadaval nime all PLINQ (Parallel LINQ). See kattub suuremas osas jada-LINQiga, seega on see asjaga tuttavale väga lihtne kasutada.

Ainuke küsimus, mis tekib, on see, kuhu kirjutada AsParallel(). Tooksin ühe keerulisema praktilise näite, millega saate katsetada.

Oletame, et meil on sotsiaalvõrgustik, kus igal inimesel võib olla sõpru, ja sul on vaja soovitada kasutajale uusi sõpru, pakkudes neid inimesi, kellega tal juba on ühiseid sõpru. Mida rohkem ühiseid sõpru, seda eespool pakutav nimekirjas on.

Nii, lihtne… aga et asja keerulisemaks teha, ei ole meie objekti struktuur kõige paremate killast. Teeme iga inimese kohta järgmise objekti:

class Person
{
public int Id;
public Person[] Friends;
}

Kasutame tavalist jadaprogrammi, et koostada 10 000 liikmega sotsiaalvõrgustik, kus igal liikmel on kuni 300 sõpra:

//tekitame 10000 isikut
Person[] persons = new Person[10000];
Random r = new Random(); 

//paneme neist igaühele 0 kuni 300 sõpra
for (int i = 0; i < persons.Length; i++)
persons[i] = new Person { Id = i + 1, Friends = new Person[r.Next(300)] };

//valime sõpradeks juhuslikud inimesed
foreach (Person person in persons)
for (int i = 0; i < person.Friends.Length; i++)
while (person.Friends[i] == null)
{
Person friend = persons[r.Next(persons.Length)];
//kontrollime, et sõber ei oleks mina ise ja et sõpru ei satuks topelt
if (friend != person && !person.Friends.Contains(friend))
person.Friends[i] = friend;
}

Jadaprogrammi kasutasin seepärast, et Random objekt ei ole lõimeturvaline. Muidu võiks seda ka rööbelda.

Võtame juhusliku inimese, kellele hakkame uusi sõpru soovitama:

//valime seltskonnast juhusliku isiku
Person me = persons[r.Next(persons.Length)];
Console.WriteLine(“Me: “ + me.Id + ” having “ + me.Friends.Count() + ” friends”);
int maxcount = 10;

Sain sellise, kellel oli 254 sõpra.

Tegin nüüd lähteviiteks jadaprogrammi, millega soovitused leida:

//siia salvestame potentsiaalsed sõbrad
Dictionary<Person, int> potential = new Dictionary<Person, int>(); 

//käime läbi kõik minu sõbrad
foreach (Person friend in me.Friends)
//käime läbi kõik inimesed
foreach (Person person in persons)
//kontrollime, et see ei oleks mina ise ega minu sõbrad
if (person != me && !me.Friends.Contains(person))
//kas minu sõber on tema sõber?
if (me.Friends.Contains(friend))
//lisame potentsiaalsete sõprade hulka
if (potential.ContainsKey(person))
potential[person]++;
else
potential[person] = 1;

//sorteerime ühiste sõprade kogused
List<int> quantities = new List<int>();
foreach (int quantity in potential.Values)
if (!quantities.Contains(quantity))
quantities.Add(quantity);
quantities.Sort();

//käime ühiste sõpradega inimesed läbi, alustades suurima ühisarvuga
int count = 0;
for (int i = quantities.Count – 1; i >= 0 && count < maxcount; i–)
foreach (Person person in potential.Keys)
if (potential[person] == quantities[i])
{
Console.WriteLine(person.Id + ” (“ + quantities[i] + ” mutual friends)”);
if (++count > maxcount)
break;
}

Uhh, päris pikk sai. Selle programmi läbimise ajaks võtame 100%.

Teeme sama asja LINQiga:

foreach
(
var person in
//käime läbi kõik minu sõbrad
me.Friends
//valime kõigi isikute hulgast need, kel on minuga ühiseid sõpru
.SelectMany(s => persons.Where(p => p != me && p.Friends.Contains(s)))
//välistame need, kes on juba minu sõbrad
.Where(w => !me.Friends.Contains(w))
//grupeerime isiku järgi
.GroupBy(g => g)
//tekitame uue grupeeritud objekti
.Select(p => new
{
Person = p.Key,
Count = p.Count()
})
//järjestame kahanevalt
.OrderByDescending(o => o.Count)
//võtame lubatud arvu
.Take(maxcount)
) Console.WriteLine(person.Person.Id + ” (“ + person.Count + ” mutual friends)”);

See jada-LINQ-programm võttis samade tulemustega 53% ajast (mis näitab, et minu jadaprogrammi saaks veel oluliselt optimeerida).

Ja kogu see krempel rööpes nägi välja sama, ainult et AsParallel() oli pandud sellisesse kohta: s => persons.AsParallel().Where.

Rööplemine võttis aega ainult 10% algsest jadaprogrammist.

Pane tähele, et AsParallel() on ainult ühes kohas, esimese filtri ees. Siin seisnebki rööplemise kunst: leida see õige koht, kuhu see maagiline sõna sisse kirjutada. Kirjuta see valesse kohta ja kogu protsess võib minna nii pikaks, et isegi pärast lõunat ei ole veel valmis.

Kui sa nüüd mõtlesid, et ma olen vana kala ja mul oli õige koha leidmine käkitegu, siis sa eksid. Ma katsetasin ikka kümneid variante, enne kui leidsin selle, mis on parim. See koht on loogiline, sest põhiline on leida inimesed, kellega on ühised sõbrad. Kuna neid inimesi on suhteliselt vähe, ei tasu rööplemine hiljem end enam ära.

Kas AsParallel() võiks olla mitmes kohas? Proovige ja katsetage, mina ei saanud ajavõitu, tegelikult hoopis kaotasin ajas. Põhjus on ilmselt selles, et lõimesid tuleb siis jagada mitme rööpe vahel ja see piirab pudelikaelale kättesaadavate ressursside arvu. Aga pudelikael peaks saama maksimaalse võimaliku.

Aga proovige, kas suudate seda värki veelgi optimeerida. Ja andke oma avastustest häbenemata teada.

Õpetus

Õpetus on ilmne: leia koodist üks pudelikael ja kasuta rööbet selles kohas ning efekt võib olla tohutu.

Rööpsusastme piiramine

Eelmises blogis vaatlesime, kuidas rööpe sektsioonimisega võib ehitada jadakoodi sektsioone ja piirata rööbete arvu. Tegelikult on .NETis olemas ka spetsiaalsed parameetrid.

double[] values = new double[10000];

List<int> threadids = new List<int>();

values.AsParallel().WithDegreeOfParallelism(5).ForAll(value =>
{
int id = System.Threading.Thread.CurrentThread.ManagedThreadId;
Console.WriteLine(id);
threadids.Add(id);
});

Console.WriteLine(threadids.Distinct().Count() + ” lõime”);

Ülaltoodud koodis näed sätet WithDegreeOfParallelism, mis minu katsete põhjal piirab kasutatavate lõimede arvu. Me lihtsalt salvestame kõigi lõimede id-d ja vaatame pärast, mitu erinevat oli. Antud juhtumil oligi see täpselt 5. Sisendväärtuse suurenedes üle 10 ei kasvanud kasutatavate lõimede arv üle 10.

double[] values = new double[10000];

List<int> threadids = new List<int>();

Parallel.For(0, values.Length, new ParallelOptions{ MaxDegreeOfParallelism = 2 }, (i) =>
{
int id = System.Threading.Thread.CurrentThread.ManagedThreadId;
Console.WriteLine(id);
threadids.Add(id);
});

threadids.GroupBy(g => g).AsParallel().ForAll(s => Console.WriteLine(s.Key + ” “ + s.Count()));

Teises näites on tulemus huvitavam. Siin anname kaasa ParallelOptions objekti ja selle MaxDegreeOfParallelism peaks tähendama igal ajahetkel maksimaalset töös olevate ülesannete (task) arvu. Tulemus on, et erinevate kasutatud lõimede arv on sisendväärtusest umbes 2 korda suurem. Sisendi 1 korral on see 1 (see on siis jadaprogramm), aga 2 korral 4 ja 5 korral 10. Kui palju neid korraga jooksis, seda ei tea, aga erinevate lõimede rakendatus oli üsna erinev (viimasel real olnud grupeerimist kasutades oli näha, et lõimede hõivatuse vahe oli kuni 5-kordne).

Kokkuvõtteks, WithDegreeOfParallelism tundub rangem, samas kui MaxDegreeOfParallelism on lõdvem ja näitab mingit venivat rööpsusastet.

Rööpe sektsioonimine

Väikese ressursinõudlusega rööpsilmuste puhul võib tekkida probleem, kus rööbete käivitamisele kulub rohkem auru kui on kasu nende tööst. Selle probleemi vältimiseks võimaldab rööp-C# rööbete sektsioonimist (inglise keeles partitioning).

Sektsioonimiseks tuleb teha using System.Collections.Concurrent.

Vaata alltoodud näidet, kus on kõrvutatud 3 varianti ruutjuure võtmiseks:

static void Main(string[] args)
{
double[] values = new double[0xFFFFFF];
Parallel.For(0, values.Length, i => values[i] = i); 

DateTime start;

//variant 1: jada
start = DateTime.Now;
foreach (double value in values)
{
Math.Sqrt(value);
};
Console.WriteLine((DateTime.Now – start).TotalMilliseconds.ToString(“0″));

//variant 2: automaatne rööplemine
start = DateTime.Now;
Parallel.ForEach(values, value =>
{
Math.Sqrt(value);
});
Console.WriteLine((DateTime.Now – start).TotalMilliseconds.ToString(“0″));

//variant 3: sektsioonitud rööplemine
start = DateTime.Now;
Parallel.ForEach(Partitioner.Create(0, values.Length, values.Length / 4), p =>
{
for (int i = p.Item1; i < p.Item2; i++)
Math.Sqrt(values[i]);
});
Console.WriteLine((DateTime.Now – start).TotalMilliseconds.ToString(“0″));
}

Esimene, nagu ise näed, on jadakood. See andis mul ajaks 256 millisekki.

Teine on tavaline rööplemine (ma pole küll ForEach()-meetodit varem käsitlenud, aga see on iseennast selgitav jadaprogrammi foreach analoog). Teises variandis käivatatakse nii palju rööpeid kui torust tuleb, ja kuna silmuse sisu on suhteliselt lihtne, võib rööplemise organiseerimisele kuluda liigselt aega. Teise variandi puhul sain ma ajaks 64 millisekki.

Kolmandas variandis loome me esiteks sektsioonija (Partitioner), mille sektsioone saab ForEach()-abil kasutada. Iga sektsiooni sees käivitame jadaprogrammi, aga sektsioone endid käsitleme rööpes.

Partititioner.Create() võtab 3 argumenti: algväärtus, lõppväärtus ja ühe sektsiooni maksimupikkus. Antud juhtumil on kolmandaks argumendiks antud values.Length / 4 ehk siis tulemuseks on, et meil tekitatakse 4 sektsiooni ja mitte rohkem.

Muidugi teid huvitab nüüd, kas sellest oli ka mingit kasu: jah, väga vähe. Ma sain ajaks 53 millisekki, mis on 17% ajavõitu. Arvata on, et minu 8-tuumane prose pani variandis 2 käiku kõik tuumad, aga variandis 3 ainult 4 tuuma.

Pange tähele, et muutuja p (tüübiga Tuple<int, int>) meetodid Item1 ja Item2 ütlevad, mis on iga sektsiooni piirid, ja siin ei ole enam võimalik jadaprogrammis foreach-lähenemist kasutada.

Konkreetsete operatsioonide optimeerimisel on sektsioonimine ilmselt üks etapp, mis tuleb sul läbi katsetada, et kindlaks teha, milline lähenemine on parim. Muidugi sõltub tegelik mõju omakorda kliendi arvuti prosest ja võimekusest, aga lohutav on teada, et rööplemine on üsnagi täpselt konfitav.

Järgmises blogis vaatame veel paari muudetavat parameetrit.

Eranditöötlus rööpsilmuses

Mis saab, kui ühes rööpsilmuse lõimes tekkib erand? Teised silmused lõpetavad töö samamoodi nagu Stop()-meetodi puhul. Erandi poolt tekitatud katkestus on suurema prioriteediga kui Stop() ja Break(), aga siiski lubatakse kõigil lõimedel oma käsilolev tsükkel lõpule viia.

Klassil ParallelLoopState on selline asi nagu IsExceptional, mida pikalt töötavad tsüklid saavad aeg-ajalt kontrollida, et siis kiiremini pillid kotti panna. Alltoodud näites ei ole state.IsExceptional kunagi tõene, sest seda küsitakse tsükli alguses ja uut tsüklit ei alustata kunagi, kui mingi tsükkel on erandi tekitanud. Aga pikema tsükli keskel on küsimusel jumet.

static void Main(string[] args)
{
try
{
Parallel.For(0, 20, (i, state) =>
{
if (state.IsExceptional)
Console.WriteLine("Ups!");
else
{
Console.WriteLine(i);
if (i == 3)
throw new Exception("Ämber");
}
});
}
catch (Exception x)
{
Console.WriteLine(x.Message);
}
}

Kui sa selle koodi käivitad, võid näha, et pärast vea genereerimist lõpetavad tsüklid töö, kus keegi parasjagu on. Silmus ise tekitab erandi “One or more errors occurred” ja rohkem infot sisaldab InnerException.

Categories: Alustus Sildid:, ,
Follow

Get every new post delivered to your Inbox.