Skip to content

Commit

Permalink
Fix ordering of pthread_create(pthread_t *thread)
Browse files Browse the repository at this point in the history
This change fixes a bug where signal_latency_async_test would flake less
than 1/1000 of the time. What was happening was pthread_kill(sender_thr)
would return EFAULT. This was because pthread_create() was not returning
the thread object pointer until after clone() had been called. So it was
actually possible for the main thread to stall after calling clone() and
during that time the receiver would launch and receive a signal from the
sender thread, and then fail when it tried to send a pong. I thought I'd
use a barrier at first, in the test, to synchronize thread creation, but
I firmly believe that pthread_create() was to blame and now that's fixed
  • Loading branch information
jart committed Jan 4, 2025
1 parent ed6d133 commit e939659
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 63 deletions.
12 changes: 4 additions & 8 deletions libc/intrin/stack.c
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ relegated bool TellOpenbsdThisIsStackMemory(void *addr, size_t size) {

// OpenBSD only permits RSP to occupy memory that's been explicitly
// defined as stack memory, i.e. `lo <= %rsp < hi` must be the case
relegated errno_t FixupCustomStackOnOpenbsd(pthread_attr_t *attr) {
relegated bool FixupCustomStackOnOpenbsd(pthread_attr_t *attr) {

// get interval
uintptr_t lo = (uintptr_t)attr->__stackaddr;
Expand All @@ -503,15 +503,11 @@ relegated errno_t FixupCustomStackOnOpenbsd(pthread_attr_t *attr) {
hi = hi & -__pagesize;

// tell os it's stack memory
errno_t olderr = errno;
if (!TellOpenbsdThisIsStackMemory((void *)lo, hi - lo)) {
errno_t err = errno;
errno = olderr;
return err;
}
if (!TellOpenbsdThisIsStackMemory((void *)lo, hi - lo))
return false;

// update attributes with usable stack address
attr->__stackaddr = (void *)lo;
attr->__stacksize = hi - lo;
return 0;
return true;
}
2 changes: 1 addition & 1 deletion libc/intrin/stack.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ void cosmo_stack_unlock(void);
void cosmo_stack_wipe(void);

bool TellOpenbsdThisIsStackMemory(void *, size_t);
errno_t FixupCustomStackOnOpenbsd(pthread_attr_t *);
bool FixupCustomStackOnOpenbsd(pthread_attr_t *);

COSMOPOLITAN_C_END_
#endif /* COSMOPOLITAN_LIBC_STACK_H_ */
2 changes: 1 addition & 1 deletion libc/thread/posixthread.internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ forceinline pureconst struct PosixThread *_pthread_self(void) {
}

forceinline void _pthread_ref(struct PosixThread *pt) {
atomic_fetch_add_explicit(&pt->pt_refs, 1, memory_order_acq_rel);
atomic_fetch_add_explicit(&pt->pt_refs, 1, memory_order_relaxed);
}

forceinline void _pthread_unref(struct PosixThread *pt) {
Expand Down
43 changes: 25 additions & 18 deletions libc/thread/pthread_create.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,12 @@ static errno_t pthread_create_impl(pthread_t *thread,
const pthread_attr_t *attr,
void *(*start_routine)(void *), void *arg,
sigset_t oldsigs) {
int rc, e = errno;
errno_t err;
struct PosixThread *pt;

// create posix thread object
if (!(pt = calloc(1, sizeof(struct PosixThread)))) {
errno = e;
if (!(pt = calloc(1, sizeof(struct PosixThread))))
return EAGAIN;
}
dll_init(&pt->list);
pt->pt_locale = &__global_locale;
pt->pt_start = start_routine;
Expand All @@ -215,7 +213,6 @@ static errno_t pthread_create_impl(pthread_t *thread,
// create thread local storage memory
if (!(pt->pt_tls = _mktls(&pt->tib))) {
free(pt);
errno = e;
return EAGAIN;
}

Expand All @@ -232,9 +229,9 @@ static errno_t pthread_create_impl(pthread_t *thread,
// caller supplied their own stack
// assume they know what they're doing as much as possible
if (IsOpenbsd()) {
if ((rc = FixupCustomStackOnOpenbsd(&pt->pt_attr))) {
if (!FixupCustomStackOnOpenbsd(&pt->pt_attr)) {
_pthread_free(pt);
return rc;
return EPERM;
}
}
} else {
Expand All @@ -259,7 +256,7 @@ static errno_t pthread_create_impl(pthread_t *thread,
if (!(pt->pt_attr.__sigaltstackaddr =
malloc(pt->pt_attr.__sigaltstacksize))) {
_pthread_free(pt);
return errno;
return EAGAIN;
}
pt->pt_flags |= PT_OWNSIGALTSTACK;
}
Expand All @@ -282,35 +279,41 @@ static errno_t pthread_create_impl(pthread_t *thread,
memory_order_relaxed);
break;
default:
_pthread_free(pt);
return EINVAL;
// pthread_attr_setdetachstate() makes this impossible
__builtin_unreachable();
}

// if pthread_attr_setdetachstate() was used then it's possible for
// the `pt` object to be freed before this clone call has returned!
atomic_store_explicit(&pt->pt_refs, 1, memory_order_relaxed);

// add thread to global list
// we add it to the beginning since zombies go at the end
_pthread_lock();
dll_make_first(&_pthread_list, &pt->list);
_pthread_unlock();

// if pthread_attr_setdetachstate() was used then it's possible for
// the `pt` object to be freed before this clone call has returned!
_pthread_ref(pt);
// we don't normally do this, but it's important to write the result
// memory before spawning the thread, so it's visible to the threads
*thread = (pthread_t)pt;

// launch PosixThread(pt) in new thread
if ((rc = clone(
if ((err = clone(
PosixThread, pt->pt_attr.__stackaddr, pt->pt_attr.__stacksize,
CLONE_VM | CLONE_THREAD | CLONE_FS | CLONE_FILES | CLONE_SIGHAND |
CLONE_SYSVSEM | CLONE_SETTLS | CLONE_PARENT_SETTID |
CLONE_CHILD_SETTID | CLONE_CHILD_CLEARTID,
pt, &pt->tib->tib_ptid, __adj_tls(pt->tib), &pt->tib->tib_ctid))) {
*thread = 0; // posix doesn't require we do this
_pthread_lock();
dll_remove(&_pthread_list, &pt->list);
_pthread_unlock();
_pthread_free(pt);
return rc;
if (err == ENOMEM)
err = EAGAIN;
return err;
}

*thread = (pthread_t)pt;
return 0;
}

Expand Down Expand Up @@ -359,7 +362,7 @@ static const char *DescribeHandle(char buf[12], errno_t err, pthread_t *th) {
* └──────────────┘
*
* @param thread is used to output the thread id upon success, which
* must be non-null
* must be non-null; upon failure, its value is undefined
* @param attr points to launch configuration, or may be null
* to use sensible defaults; it must be initialized using
* pthread_attr_init()
Expand All @@ -375,14 +378,18 @@ static const char *DescribeHandle(char buf[12], errno_t err, pthread_t *th) {
errno_t pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine)(void *), void *arg) {
errno_t err;
errno_t olderr = errno;
_pthread_decimate(kPosixThreadZombie);
BLOCK_SIGNALS;
err = pthread_create_impl(thread, attr, start_routine, arg, _SigMask);
ALLOW_SIGNALS;
STRACE("pthread_create([%s], %p, %t, %p) → %s",
DescribeHandle(alloca(12), err, thread), attr, start_routine, arg,
DescribeErrno(err));
if (!err)
if (!err) {
_pthread_unref(*(struct PosixThread **)thread);
} else {
errno = olderr;
}
return err;
}
42 changes: 15 additions & 27 deletions test/posix/signal_latency_async_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@ void receiver_signal_handler(int signo) {
}

void *sender_func(void *arg) {

for (int i = 0; i < ITERATIONS; i++) {

// Wait a bit sometimes
if (rand() % 2 == 1) {
if (rand() % 2) {
volatile unsigned v = 0;
for (;;)
if (++v == 4000)
Expand All @@ -67,32 +66,25 @@ void *sender_func(void *arg) {
}

void *receiver_func(void *arg) {

// Wait for asynchronous signals
for (;;) {
static int iteration = 0;
do {
// wait for signal handler to be called
if (atomic_exchange_explicit(&receiver_got_signal, 0,
memory_order_acq_rel)) {

// record received time
struct timespec receive_time;
clock_gettime(CLOCK_MONOTONIC, &receive_time);

long sec_diff = receive_time.tv_sec - send_time.tv_sec;
long nsec_diff = receive_time.tv_nsec - send_time.tv_nsec;
double latency_ns = sec_diff * 1e9 + nsec_diff;
latencies[iteration++] = latency_ns;

static int iteration = 0;
if (iteration < ITERATIONS)
latencies[iteration++] = latency_ns;

// Pong sender
// pong sender
if (pthread_kill(sender_thread, SIGUSR2))
exit(2);

// Exit if done
if (iteration >= ITERATIONS)
pthread_exit(0);
}
}

} while (iteration < ITERATIONS);
return 0;
}

Expand All @@ -108,11 +100,7 @@ int compare(const void *a, const void *b) {

int main() {

// TODO(jart): fix flakes
if (1)
return 0;

// Install signal handlers
// install handlers
struct sigaction sa;
sa.sa_handler = receiver_signal_handler;
sa.sa_flags = 0;
Expand All @@ -121,27 +109,27 @@ int main() {
sa.sa_handler = sender_signal_handler;
sigaction(SIGUSR2, &sa, 0);

// Create receiver thread first
// create receiver thread first
if (pthread_create(&receiver_thread, 0, receiver_func, 0))
exit(11);

// Create sender thread
// create sender thread
if (pthread_create(&sender_thread, 0, sender_func, 0))
exit(12);

// Wait for threads to finish
// wait for threads to finish
if (pthread_join(sender_thread, 0))
exit(13);
if (pthread_join(receiver_thread, 0))
exit(14);

// Compute mean latency
// compute mean latency
double total_latency = 0;
for (int i = 0; i < ITERATIONS; i++)
total_latency += latencies[i];
double mean_latency = total_latency / ITERATIONS;

// Sort latencies to compute percentiles
// sort latencies to compute percentiles
qsort(latencies, ITERATIONS, sizeof(double), compare);

double p50 = latencies[(int)(0.50 * ITERATIONS)];
Expand Down
19 changes: 11 additions & 8 deletions tool/scripts/flakes
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@ import concurrent.futures
from collections import Counter
from typing import List, Dict, Tuple

NUM_PARALLEL = int(os.cpu_count() * 1.5)
NUM_PARALLEL = int(os.cpu_count() * 20)

def find_test_files(root_dir: str) -> List[str]:
def find_test_files(root: str) -> List[str]:
"""Find all executable files ending with _test recursively."""
test_files = []
for root, _, files in os.walk(root_dir):
for file in files:
if file.endswith('_test'):
file_path = os.path.join(root, file)
if os.access(file_path, os.X_OK):
test_files.append(file_path)
if os.path.isdir(root):
for root, _, files in os.walk(root):
for file in files:
if file.endswith('_test'):
file_path = os.path.join(root, file)
if os.access(file_path, os.X_OK):
test_files.append(file_path)
elif root.endswith('_test'):
test_files.append(root)
return test_files

def run_single_test(test_path: str) -> int:
Expand Down

0 comments on commit e939659

Please sign in to comment.