przejście do zawartości
Jan Kończak
Narzędzia użytkownika
Zaloguj
Narzędzia witryny
Narzędzia
Pokaż stronę
Poprzednie wersje
Odnośniki
Ostatnie zmiany
Menadżer multimediów
Indeks
Zaloguj
Ostatnie zmiany
Menadżer multimediów
Indeks
Jesteś tutaj:
start
»
bio-psiec
»
threads
bio-psiec:threads
Ta strona jest tylko do odczytu. Możesz wyświetlić źródła tej strony ale nie możesz ich zmienić.
<html><style> pre {margin-top:-1.4em;line-height:1.2em} ol {margin-top:-1.4em} </style></html> ====== Wprowadzenie ====== Wyobraź sobie, że masz do napisania grę "kto szybciej pisze na klawiaturze" z następującymi zasadami: * trzech graczy łączy się do gry, * kiedy gracze dołączą, serwer rozpoczyna kolejno trzy rundy, * na początku każdej rundy serwer wysyła do graczy tekst który mają przepisać, * rundę wygrywa gracz który najszybciej prześle poprawnie przepisany tekst, * po zakończeniu trzeciej rundy klienci są rozłączani, a program serwera się kończy. Przygotowujesz logikę — bazę tekstów do przepisywania, licznik rund, listę klientów, funkcję rozpoczynającą nową rundę i kończącą grę jeśli rozegrano już trzy rundy, funkcję oceniającą odpowiedź od gracza: <code C++> #include <algorithm> #include <cstdlib> #include <iostream> #include <string> #include <thread> #include <vector> #include <netinet/in.h> #include <sys/socket.h> #include <unistd.h> std::vector<std::string> randomTexts = {"Colors may fade.", "Type louder, please.", "Natural laws have no pity.", "If in doubt, mumble.", "Place stamp here."}; int currRound = -1; int clients[3]; void nextRound() { if (++currRound == 3) { for (int i = 0; i < 3; ++i) { write(clients[i], "Game ended. Thanks for playing.\n", 33); shutdown(clients[i], SHUT_RDWR); close(clients[i]); } exit(0); } std::string msg = "\nNew round begins! Please type:\n" + randomTexts[currRound] + "\n"; for (int i = 0; i < 3; ++i) write(clients[i], msg.c_str(), msg.length()); } void checkAnswer(int cliFd, const char *ansBytes, int ansLength) { if ((randomTexts[currRound] + "\n") != std::string(ansBytes, ansLength)) { write(cliFd, "Wrong! Try again.\n", 18); return; } for (int i = 0; i < 3; ++i) if (clients[i] == cliFd) write(clients[i], "Correct!\n", 9); else write(clients[i], "Somone else was faster!\n", 25); nextRound(); } </code> Następnie zaczynasz pisać funkcję ''main'' – rozpoczynasz nasłuchiwanie, przyjmujesz trzech klientów, rozpoczynasz pierwszą rundę: <code C++> int main(int argc, char **argv) { std::random_shuffle(randomTexts.begin(), randomTexts.end()); if (argc < 2) { std::cerr << "Missing port number!" << std::endl; return 1; } sockaddr_in sa{}; sa.sin_family = AF_INET; sa.sin_port = htons(atoi(argv[1])); int servFd = socket(AF_INET, SOCK_STREAM, 0); if (-1 == bind(servFd, (sockaddr *)&sa, sizeof(sa))) { perror("bind failed"); return 1; } listen(servFd, 1); for (int i = 0; i < 3; ++i) { clients[i] = accept(servFd, 0, 0); if (i != 2) write(clients[i], "Wait for others...\n", 19); } close(servFd); nextRound(); </code> I teraz musisz odebrać wiadomość od tego klienta, który pierwszy wyśle przepisany tekst. Wiesz że można zrobić to kodem: <code C++> char buf[256]; int cnt = read(cliFd, buf, 256); if (cnt <= 0) exit(1); checkAnswer(cliFd, buf, cnt); </code> <html><div style="margin-top:-1.4em"></div></html> Ale zaraz, nie wiesz przecież który gracz pierwszy wyśle przepisany tekst! \\ Dlatego musisz jednocześnie czekać na przyjście wiadomości od każdego z tych trzech graczy. Tylko jak to zrobić? ====== Obsługa wielu zdarzeń naraz ====== Typowo aplikacje (jakiekolwiek, włączając sieciowe) muszą jednocześnie obsługiwać wiele źródeł zdarzeń – np. z sieci przyszła wiadomość, użytkownik kliknął na menu czy wpisał coś z klawiatury, minął czas oczekiwania na coś. \\ Często funkcje obsługujące takie zdarzenia po prostu blokują się czekając aż oczekiwane zdarzenie wystąpi. \\ Przykładowo domyślnie operacje na gniazdach, np. ''connect'', ''accept'' czy ''read'', blokują przetwarzanie (odpowiednio czekając na nawiązanie połączenia, przyjście nowego klienta i przyjście wiadomości). \\ Wcześniej na zajęciach była mowa że można to zmienić na zachowanie nieblokujące, ale oczywiście aktywne czekanie ([[https://en.wikipedia.org/wiki/Busy_waiting|busy waiting]]) jest bardzo głupim pomysłem((choć są od tego wyjątki - np. [[https://en.wikipedia.org/wiki/Spinlock|spinlock]] czy [[https://www.dpdk.org/|DPDK]])), bo niepotrzebnie zużywa czas procesora. Do jednoczesnej obsługi wielu źródeł zdarzeń stworzono dedykowane metody, można też używać podejścia wielowątkowego. Tworząc aplikację sieciową można ją napisać: * iteracyjnie, kiedy współbieżność jest zbędna, * używając pętli zdarzeń ([[https://en.wikipedia.org/wiki/Event_loop|event loop]]) – pętli w której najpierw specjalną funkcją programista prosi system operacyjny o wskazanie które zdarzenie nastąpiło pierwsze, a potem je obsługuje, * jako aplikację wielowątkową, w której każde źródło zdarzeń (np. gniazdo) jest obsługiwane w osobnym wątku. ====== Krótko o wątkach ====== ==== Wstęp ==== Wiesz, że procesy (uruchomione programy) wykonują, jedna po drugiej, następujące po sobie instrukcje programu. Takie wykonywanie po kolei instrukcji programu można nazwać wątkiem. \\ Jedną z instrukcji którą programista może umieścić w swoim programie jest poproszenie systemu operacyjnego o uruchomienie nowego wątku przetwarzania, wskazując którą instrukcję nowy wątek ma wykonać jako pierwszą. \\ Po stworzeniu drugiego wątku (zakładając że w procesie był jeden wątek) proces robi dwie rzeczy naraz – jeden wątek wykonuje kolejną instrukcję po tej żądającej tworzenia nowego wątku, drugi wykonuje kolejne instrukcje zaczynając od wskazanej. \\ O tym który wątek używa w danej chwili którego procesora decyduje system operacyjny – dwa wątki tego samego procesu mogą np. naprzemiennie dostawać czas na tym samym procesorze, albo jednocześnie dostać czas (każdy wątek na innym procesorze). System operacyjny dba o to, żeby każdy proces miał własną pamięć operacyjną (w której między innymi trzyma wartości zmiennych), własną listę otwartych plików, własny katalog roboczy, etc. \\ Wątki tego samego procesu używają tej samej pamięci, tych samych plików, tego samego katalogu roboczego etc. \\ Każdy wątek ma oddzielnie od innych tylko te rzeczy, które są związane z informacjami o kolejno wykonywanych instrukcjach i ze stanem procesora (np. informację czy rezultat poprzedniej operacji matematycznej był ujemny). Poza instrukcją która prosi system operacyjny o uruchomienie nowego wątku, jest też instrukcja która prosi o poczekanie na zakończenie wskazanego wątku. Można dzięki temu np. wykonać część obliczeń w bieżącym wątku, a część w nowo do tego celu utworzonym, a kiedy potrzeba już wyniku poczekać aż ten dodatkowy wątek się zakończy. \\ Systemy operacyjne wymagają od programisty albo żeby poinformował go że nigdy nie wywoła funkcji czekające na zakończenie wskazanego wątku, albo żeby ostatecznie ją wykonał – do tego czasu (po zakończeniu wątku) system musi wciąż pamiętać że ten się zakończył. ==== C++ ==== Wątki w języku C++: * tworzy się w konstruktorze klasy ''std::thread'', czyli przy tworzeniu zmiennej typu ''std::thread'', * zmiennych z klasy ''std::thread'' nie można kopiować (ale można je przenosić), * na każdej zmiennej z klasy ''std::thread'' trzeba wykonać albo ''join'' albo ''detach'' przed wywołaniem destruktora. Przykładowy kod tworzący jeden wątek i czekający aż ten się zakończy: <code c++> #include <iostream> #include <thread> #include <unistd.h> void funkcja() { for (int i = 0; i < 5; ++i) { std::cout << "drugi wątek, iteracja " << i << std::endl; usleep(100000); } } int main() { std::thread wątek(funkcja); for (int i = 0; i < 2; ++i) { std::cout << "główny wątek, iteracja " << i << std::endl; usleep(100000); } wątek.join(); return 0; } </code> Przykładowy kod tworzący jeden wątek i NIE czekający aż ten się zakończy: <code c++> #include <iostream> #include <thread> #include <unistd.h> void funkcja() { for (int i = 0; i < 5; ++i) { std::cout << "drugi wątek, iteracja " << i << std::endl; usleep(100000); } } int main() { std::thread wątek(funkcja); wątek.detach(); for (int i = 0; i < 2; ++i) { std::cout << "główny wątek, iteracja " << i << std::endl; usleep(100000); } return 0; } </code> Przekazywanie argumentów do funkcji którą ma wykonywać wątek odbywa się następująco: <code c++> #include <iostream> #include <string> #include <thread> #include <unistd.h> void funkcja(int ilość, std::string tekst) { for (int i = 0; i < ilość; ++i) { std::cout << tekst << ", iteracja " << i << std::endl; usleep(100000); } } int main() { std::thread wątek(funkcja, 3, "drugi wątek"); wątek.detach(); for (int i = 0; i < 3; ++i) { std::cout << "główny wątek, iteracja " << i << std::endl; usleep(100000); } return 0; } </code> Dowolny wątek może stworzyć nowy wątek; poniżej przykład kodu tworzący wiele wątków: <code c++> #include <iostream> #include <string> #include <thread> #include <unistd.h> void funkcja(int ilość, std::string tekst) { for (int i = 0; i < ilość; /*nic*/) { std::cout << tekst << ", iteracja " << i << std::endl; usleep(100000); ++i; std::thread(funkcja, ilość - i, tekst + "_" + std::to_string(i)).detach(); } } int main() { std::thread(funkcja, 5, "wątek 1").detach(); for (int i = 0; i < 6; ++i) { std::cout << "wątek 0, iteracja " << i << std::endl; usleep(100000); } return 0; } </code> ==== Synchronizacja wątków ==== Wątki tego samego procesu pracują na tej samej pamięci, tych samych plikach etc. \\ Jednoczesna praca na tych samych zasobach (np. tej samej zmiennej) z dwóch wątków może doprowadzić do problemów. Poniżej 2 wątki, każdy 100000 razy, zwiększają wartość zmiennej ''a'' i ''b'' o 1.001 raza. <code c++> #include <iostream> #include <thread> double a = 1.0; double b = 1.0; void funkcja() { for (int i = 0; i < 100000; ++i) { a = a * 1.001; b = b * 1.001; } } int main() { std::thread wątek(funkcja); funkcja(); wątek.join(); std::cout << a << std::endl << b << std::endl; return 0; } </code> Oczekiwany wynik zarówno dla zmiennej ''a'' jak i ''b'' to //1.0·1.001<sup>2*100000</sup> ≈ 6.54e86//, ale po uruchomieniu programu można zobaczyć inne wyniki, np. //a≈2.9e52// i //b≈1.5e49//, czy //a≈3.2e62// i //b≈4.2e46//. \\ Procesor wykonując operację ''a = a * 1.001;'' najpierw odczytuje wartość ''a'', potem mnoży ją przez 1.001, i na końcu zapisuje wynik z powrotem do zmiennej ''a'' – w trzech oddzielnych krokach. \\ Jeden z możliwych scenariuszy jest taki, że dwa wątki działające na dwóch różnych procesorach równocześnie odczytają wartkość ''a'', każdy pomnoży ją przez 1.001, a na końcu każdy zapisze wynik z powrotem do zmiennej ''a''. Wtedy mimo wykonania dwóch powiększeń ''a'', wartość zmiennej ''a'' zostanie powiększona tylko jednokrotnie. Do rozwiązania tego i podobnych problemów potrzebne jest dodanie przez programistę operacji synchronizujących pracę wątków tak, by nie korzystały naraz z tych samych rzeczy. \\ To jak należy poprawnie pisać współbieżne programy jest tematem na osobny przedmiot (w programie bioinformatyki to np. przedmiot obieralny z trzeciego semestru //podstawy programowania współbieżnego//). ====== Obsługa sieci z wielu wątków ====== Wracając do przykładu z początku zajęć: jeżeli więc trzeba czekać na wiadomość od trzech klientów naraz, można przygotować funkcję którą na każdym takim gnieździe trzeba wykonywać: <code c++> void readFromClient(int cliFd) { while (true) { char buf[256]; int cnt = read(cliFd, buf, 256); if (cnt <= 0) exit(1); checkAnswer(cliFd, buf, cnt); } } </code> A następnie dokończyć funkcję ''main'' liniami: <code c++> ... nextRound(); std::thread(readFromClient, clients[0]).detach(); std::thread(readFromClient, clients[1]).detach(); readFromClient(clients[2]); } </code> Standard POSIX określa czy i jak można ją wykonywać funkcje współbieżnie z innymi. Dla gniazd TCP można współbieżne wywoływać operacje wejścia/wyjścia na tym samym gnieździe i są one atomowe, tzn. wywołanie współbieżnie dwóch ''send'' (lub dwóch ''recv'') wykona najpierw jedno z tych wywołań, potem drugie. \\ Przy czym trzeba pamiętać, że niektóre operacje mogą wysłać czy odczytać mniej danych niż żądał programista, więc zwykle i tak synchronizacja (przynajmniej odbierania) na tym samym gnieździe jest konieczna. \\ Zamknięcie połączenia TCP z jednego wątku przerywa operacje czekające na odebranie wiadomości na innych wątkach (zwracają ''0'' sygnalizujące że zamknięto połączenie) i przerywa operacje wysyłania danych (zgłaszają sygnał SIGPIPE i zwracają -1 sygnalizujące błąd, ustawiając wcześniej ''errno'' na ''EPIPE''). ~~Zadanie.#~~ Połącz powyższe fragmenty kodu z odpowiednimi fragmentami z początku materiałów. Skompiluj program i przetestuj jego działanie. ~~Zadanie.#~~ Zastanów się co złego może się stać jeżeli program jednocześnie zacznie wykonywać funkcję ''checkAnswer'' z dwóch wątków (dla dwóch różnych graczy). ~~Zadanie.#~~ Napisz program który połączy się, używając TCP, pod wskazany adres, a następnie będzie równocześnie: * odczytywał dane wpisywane z klawiatury, i po odczytaniu wysyłał je przez sieć, * odbierał dane z sieci, i po odebraniu wypisywał je na ekran. ~~Zadanie.#~~ Napisz serwer TCP, który każdą otrzymaną wiadomość przekaże wszystkim połączonym klientom. Zauważ że serwer musi jednocześnie czekać na nowych klientów i jednocześnie odbierać wiadomości od każdego z już połączonych.
bio-psiec/threads.txt
· ostatnio zmienione: 2026/01/08 15:53 przez
jkonczak
Narzędzia strony
Pokaż stronę
Poprzednie wersje
Odnośniki
Złóż / rozłóż wszystko
Do góry