6.087 Lecture 13 – January 28, 2010Review Multithreaded Programming Race Conditions Semaphores Thread Safety, Deadlock, and Starvation Sockets and Asynchronous I/O Sockets Asynchronous I/O 1 Review: Multithreaded programming• Thread: abstraction of parallel processing with shared memory • Program organized to execute multiple threads in parallel • Threads spawned by main thread, communicate via shared resources and joining • pthread library implements multithreading int pthread_create ( pthread_t thread , const pthread _attr _t attr , ∗ ∗• void ∗(∗ start _routine )( void ∗), void ∗ arg ); • void pthread_exit(void ∗value_ptr); • int pthread_join(pthread_t thread, void ∗∗value_ptr); • pthread_t pthread_self(void); 1 Review: Resource sharingAccess to shared resources need to be controlled to • ensure deterministic operation • Synchronization objects: mutexes, semaphores, read/write locks, barriers • Mutex: simple single lock/unlock mechanism • int pthread_mutex_init(pthread_mutex_t ∗mutex, const pthread_mutexattr_t ∗ attr); • int pthread_mutex_destroy(pthread_mutex_t ∗mutex); • int pthread_mutex_lock(pthread_mutex_t ∗mutex); • int pthread_mutex_trylock(pthread_mutex_t ∗mutex); • int pthread_mutex_unlock(pthread_mutex_t ∗mutex); 2 Review: Condition variables• Lock/unlock (with mutex) based on run-time condition variableAllows thread to wait for condition to be true• • Other thread signals waiting thread(s), unblocking them • int pthread_cond_init(pthread_cond_t ∗cond, const pthread_condattr_t ∗attr); • int pthread_cond_destroy(pthread_cond_t ∗cond); • int pthread_cond_wait(pthread_cond_t ∗cond, pthread_mutex_t ∗mutex); • int pthread_cond_broadcast(pthread_cond_t ∗cond); • int pthread_cond_signal(pthread_cond_t ∗cond); 3 6.087 Lecture 13 – January 28, 2010Review Multithreaded Programming Race Conditions Semaphores Thread Safety, Deadlock, and Starvation Sockets and Asynchronous I/O Sockets Asynchronous I/O 4 Multithreaded programming• OS implements scheduler – determines which threads execute when • Scheduling may execute threads in arbitrary order • Without proper synchronization, code can execute non-deterministically • Suppose we have two threads: 1 reads a variable, 2 modifies that variable • Scheduler may execute 1, then 2, or 2 then 1 Non-determinism creates a race condition – where the • behavior/result depends on the order of execution 4 Race conditions• Race conditions occur when multiple threads share a variable, without proper synchronization • Synchronization uses special variables, like a mutex, to ensure order of execution is correct • Example: thread T1 needs to do something before thread T2 • condition variable forces thread T2 to wait for thread T1 • producer-consumer model program • Example: two threads both need to access a variable and modify it based on its valuesurround access and modification with a mutex• • mutex groups operations together to make them atomic – treated as one unit 5 Race conditions in assemblyConsider the following program race.c: unsigned i n t cnt = 0; void ∗count ( void ∗arg) {/∗ thread body ∗/int i; for ( i = 0; i < 100000000; i ++) cnt++; return NULL ; } int main ( void ){ pthread_t tids [4]; int i; for (i =0; i<4; i++) pthread_create(&tids[i], NULL, count, NULL);for (i =0; i<4; i++)pthread _join(tids[i], NULL);printf ("cnt=%u\n " ,cnt );return 0;} What is the value of cnt? [Bryant and O’Halloran. Computer Systems: A Programmer’s Perspective. Prentice Hall, 2003.] © Prentice Hall. All rights reserved. This content is excluded from our Creative Commons license. For more information, see http://ocw.mit.edu/fairuse. 6Race conditions in assemblyIdeally, should increment cnt 4 × 100000000 times, so cnt = 400000000. However, running our code gives: athena% ./race.o cnt=137131900 athena% ./race.o cnt=163688698 athena% ./race.o cnt=163409296 athena% ./race.o cnt=170865738 athena% ./race.o cnt=169695163 So, what happened? Athena is MIT's UNIX-based computing environment. OCW does not provide access to it. 7 1 1Race conditions in assembly• C not designed for multithreading • No notion of atomic operations in C • Increment cnt++; maps to three assembly operations: 1. load cnt into a register 2. increment value in register 3. save new register value as new cnt • So what happens if thread interrupted in the middle? Race condition! • 8 Race conditions in assemblyLet’s fix our code: pthread_mutex_t mutex; unsigned i n t cnt = 0; void ∗count ( void ∗arg) {/∗ thread body ∗/int i; for ( i = 0; i < 100000000; i ++) { pthread_mutex_lock(&mutex );cnt++;pthread_mutex_unlock(&mutex );} return NULL ; } int main ( void ){ pthread_t tids [4]; int i; pthread_mutex_init(&mutex, NULL); for (i =0; i<4; i++) pthread_create(&tids[i], NULL, count, NULL); for (i =0; i<4; i++) pthread _join(tids[i], NULL);pthread_mutex_destroy(&mutex );printf ("cnt=%u\n " ,cnt );return 0;} 9 Race conditions• Note that new code functions correctly, but is much slower • C statements not atomic – threads may be interrupted at assembly level, in the middle of a C statement • Atomic operations like mutex locking must be specified as atomic using special assembly instructions • Ensure that all statements accessing/modifying shared variables are synchronized 10 Semaphores• Semaphore – special nonnegative integer variable s, initially 1, which implements two atomic operations: • P(s) – wait until s> 0, decrement s and return • V(s) – increment s by 1, unblocking a waiting thread • Mutex – locking calls P(s) and unlocking calls V(s) • Implemented in , part of library rt, not pthread 11 Using semaphores• Initialize semaphore to value: int sem_init(sem_t ∗sem, int pshared, unsigned int value); • Destroy semaphore: int sem_destroy(sem_t ∗sem); • Wait to lock, blocking: int sem_wait(sem_t ∗sem); • Try to lock, returning immediately (0 if now locked, −1 otherwise): int sem_trywait(sem_t ∗sem); • Increment semaphore, unblocking a waiting thread: int sem_post(sem_t ∗sem); 12 Producer and consumer revisited• Use a semaphore to track available slots in shared buffer • Use a semaphore to track items in shared buffer • Use a semaphore/mutex to make buffer operations synchronous 13 Producer and consumer revisited#include for (i = 0; i sem_wait(&items);#include sem_wait(&mutex);printf (" consumed(%ld):%d\n " , sem_t mutex, slots , items; pthread_self(), i+1); sem_post(&mutex ); #define SLOTS 2 sem_post(&slots); #define ITEMS 10 } return NULL; void∗ produce ( void∗ arg) } { int i; int main ()for (i = 0; i (extension of C standard library) • Network I/O, due to latency, usually implemented asynchronously, using multithreading • Sockets use client/server model of establishing connections 23 Creating a socket• Create a socket, getting the file descriptor for that socket: int socket(int domain, int type, int protocol ); • domain – use constant AF_INET, so we’re using the internet; might also use AF_INET6 for IPv6 addresses • type – use constant SOCK_STREAM for connection-based protocols like TCP/IP; use SOCK_DGRAM for connectionless datagram protocols like UDP (we’ll concentrate on the former) • protocol – specify 0 to use default protocol for the socket type (e.g. TCP) • returns nonnegative integer for file descriptor, or −1 if couldn’t create socket • Don’t forget to close the file descriptor when you’re done! 24 Connecting to a server• Using created socket, we connect to server using: int connect(int fd, struct sockaddr ∗addr, int addr_len); • fd – the socket’s file descriptor • addr – the address and port of the server to connect to; for internet addresses, cast data of type struct sockaddr_in, which has the following members: • sin_family – address family; always AF_INET • sin_port – port in network byte order (use htons() to convert to network byte order) • sin_addr.s_addr – IP address in network byte order (use htonl() to convert to network byte order) • addr_len – size of sockaddr_in structurereturns 0 if successful• 25 Associate server socket with a port• Using created socket, we bind to the port using: int bind(int fd, struct sockaddr ∗addr, int addr_len); • fd, addr, addr_len – same as for connect() note that address should be IP address of desired interface • (e.g. eth0) on local machine • ensure that port for server is not taken (or you may get “address already in use” errors) • return 0 if socket successfully bound to port 26 Listening for clients• Using the bound socket, start listening: int listen (int fd, int backlog); • fd – bound socket file descriptor • backlog – length of queue for pending TCP/IP connections; normally set to a large number, like 1024 returns 0 if successful • 27 Accepting a client’s connection• Wait for a client’s connection request (may already be queued): int accept(int fd, struct sockaddr ∗addr, int ∗addr_len); • fd – socket’s file descriptor • addr – pointer to structure to be filled with client address info (can be NULL) • addr_len – pointer to int that specifies length of structure pointed to by addr; on output, specifies the length of the stored address (stored address may be truncated if bigger than supplied structure) • returns (nonnegative) file descriptor for connected client socket if successful 28 Reading and writing with sockets• Send data using the following functions: int write(int fd, const void ∗buf, size_t len ); int send(int fd, const void ∗buf, size_t len, int flags ); • Receive data using the following functions: int read(int fd, void ∗buf, size_t len ); int recv(int fd, void ∗buf, size_t len, int flags ); • fd – socket’s file descriptorbuf – buffer of data to read or write• • len – length of buffer in bytes • flags – special flags; we’ll just use 0 • all these return the number of bytes read/written (if successful) 29 Asynchronous I/O• Up to now, all I/O has been synchronous – functions do not return until operation has been performed • Multithreading allows us to read/write a file or socket without blocking our main program code (just put I/O functions in a separate thread) • Multiplexed I/O – use select() or poll() with multiple file descriptors 30 I/O multiplexing with select()• To check if multiple files/sockets have data to read/write/etc: (include ) int select(int nfds, fd_set ∗readfds, fd_set ∗writefds, fd_set ∗errorfds, struct timeval ∗timeout); • nfds – specifies the total range of file descriptors to be tested (0 up to nfds−1) • readfds, writefds, errorfds – if not NULL, pointer to set of file descriptors to be tested for being ready to read, write, or having an error; on output, set will contain a list of only those file descriptors that are ready • timeout – if no file descriptors are ready immediately, maximum time to wait for a file descriptor to be ready • returns the total number of set file descriptor bits in all the sets • Note that select() is a blocking function 31 I/O multiplexing with select()• fd_set – a mask for file descriptors; bits are set (“1”) if in the set, or unset (“0”) otherwise • Use the following functions to set up the structure: • FD_ZERO(&fdset) – initialize the set to have bits unset for all file descriptors • FD_SET(fd, &fdset) – set the bit for file descriptor fd in the set • FD_CLR(fd, &fdset) – clear the bit for file descriptor fd in the set • FD_ISSET(fd, &fdset) – returns nonzero if bit for file descriptor fd is set in the set 32 I/O multiplexing using poll()• Similar to select(), but specifies file descriptors differently: (include ) int poll (struct pollfd fds [], nfds_t nfds, int timeout); • fds – an array of pollfd structures, whose members fd, events, and revents, are the file descriptor, events to check (OR-ed combination of flags like POLLIN, POLLOUT, POLLERR, POLLHUP), and result of polling with that file descriptor for those events, respectively • nfds – number of structures in the array • timeout – number of milliseconds to wait; use 0 to return immediately, or −1 to block indefinitely 33 Summary• Multithreaded programming race conditions • • semaphores • thread safety deadlock and starvation • • Sockets, asynchronous I/O • client/server socket functions • select() and poll() 34 MIT OpenCourseWare http://ocw.mit.edu 6.087 Practical Programming in C January (IAP) 2010 For information about citing these materials or our Terms of Use,visit: http://ocw.mit.edu/terms.