From mboxrd@z Thu Jan 1 00:00:00 1970 Path: news.gmane.io!.POSTED.blaine.gmane.org!not-for-mail From: dick.r.chiang@gmail.com Newsgroups: gmane.emacs.bugs Subject: bug#36609: 27.0.50; Possible race-condition in threading implementation Date: Sun, 06 Jun 2021 11:50:06 -0400 Message-ID: <87k0n73scx.fsf@dick> References: <87muhks3b5.fsf@hochschule-trier.de> Mime-Version: 1.0 Content-Type: multipart/mixed; boundary="=-=-=" Injection-Info: ciao.gmane.io; posting-host="blaine.gmane.org:116.202.254.214"; logging-data="12675"; mail-complaints-to="usenet@ciao.gmane.io" User-Agent: Gnus/5.13 (Gnus v5.13) Emacs/28.0.50 (gnu/linux) Cc: 36609@debbugs.gnu.org To: Andreas Politz Original-X-From: bug-gnu-emacs-bounces+geb-bug-gnu-emacs=m.gmane-mx.org@gnu.org Sun Jun 06 17:51:10 2021 Return-path: Envelope-to: geb-bug-gnu-emacs@m.gmane-mx.org Original-Received: from lists.gnu.org ([209.51.188.17]) by ciao.gmane.io with esmtps (TLS1.2:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.92) (envelope-from ) id 1lpv3O-0003A1-6F for geb-bug-gnu-emacs@m.gmane-mx.org; Sun, 06 Jun 2021 17:51:10 +0200 Original-Received: from localhost ([::1]:44284 helo=lists1p.gnu.org) by lists.gnu.org with esmtp (Exim 4.90_1) (envelope-from ) id 1lpv3M-0000m8-5k for geb-bug-gnu-emacs@m.gmane-mx.org; Sun, 06 Jun 2021 11:51:08 -0400 Original-Received: from eggs.gnu.org ([2001:470:142:3::10]:55426) by lists.gnu.org with esmtps (TLS1.2:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.90_1) (envelope-from ) id 1lpv3G-0000ly-6N for bug-gnu-emacs@gnu.org; Sun, 06 Jun 2021 11:51:02 -0400 Original-Received: from debbugs.gnu.org ([209.51.188.43]:42196) by eggs.gnu.org with esmtps (TLS1.2:ECDHE_RSA_AES_128_GCM_SHA256:128) (Exim 4.90_1) (envelope-from ) id 1lpv3F-0006iQ-W9 for bug-gnu-emacs@gnu.org; Sun, 06 Jun 2021 11:51:02 -0400 Original-Received: from Debian-debbugs by debbugs.gnu.org with local (Exim 4.84_2) (envelope-from ) id 1lpv3F-0006d4-RK for bug-gnu-emacs@gnu.org; Sun, 06 Jun 2021 11:51:01 -0400 X-Loop: help-debbugs@gnu.org Resent-From: dick.r.chiang@gmail.com Original-Sender: "Debbugs-submit" Resent-CC: bug-gnu-emacs@gnu.org Resent-Date: Sun, 06 Jun 2021 15:51:01 +0000 Resent-Message-ID: Resent-Sender: help-debbugs@gnu.org X-GNU-PR-Message: followup 36609 X-GNU-PR-Package: emacs X-GNU-PR-Keywords: fixed Original-Received: via spool by 36609-submit@debbugs.gnu.org id=B36609.162299461525369 (code B ref 36609); Sun, 06 Jun 2021 15:51:01 +0000 Original-Received: (at 36609) by debbugs.gnu.org; 6 Jun 2021 15:50:15 +0000 Original-Received: from localhost ([127.0.0.1]:53742 helo=debbugs.gnu.org) by debbugs.gnu.org with esmtp (Exim 4.84_2) (envelope-from ) id 1lpv2U-0006b7-S2 for submit@debbugs.gnu.org; Sun, 06 Jun 2021 11:50:15 -0400 Original-Received: from mail-qt1-f178.google.com ([209.85.160.178]:44813) by debbugs.gnu.org with esmtp (Exim 4.84_2) (envelope-from ) id 1lpv2T-0006ao-Jh for 36609@debbugs.gnu.org; Sun, 06 Jun 2021 11:50:14 -0400 Original-Received: by mail-qt1-f178.google.com with SMTP id t17so10838033qta.11 for <36609@debbugs.gnu.org>; Sun, 06 Jun 2021 08:50:13 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:references:date:in-reply-to:message-id :user-agent:mime-version; bh=qocpl6/YCSHRbXI2SwuMZIljjMga9yV3hKgLIpnWUP0=; b=Lom9jLP0cWh4xJuNKna9qBob9ejVI7+xoSCe/VpmG1SBJ4l+/Qmn5fd4Jui9N2WdZB nFIkfopw3Mhr7d+JQ/EB37Prs0G57zjvizuXDVz21y7HtJHcHQKRgIcoGYI9V/XhqukN 5Gb3CPA+p9lJ2Pgy24f+CfeGnm26sYkIpRGaIf4i/XtMuc6zga0pHQsp1AGyPnNSe/VC lZu/MfGP0nEmKAdiJzm5AtwSp9X+ix9/Yikb3wz54/TVSN9x6DFVIamhDSaVSwDgW8M5 bmh+mRnkpEVV/S0ojIhOmfbxw1IyCGgZBL0vDGsFVSTlWLvt4dsG5Gdq0KIGxBv0ev/W oFaQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:references:date:in-reply-to :message-id:user-agent:mime-version; bh=qocpl6/YCSHRbXI2SwuMZIljjMga9yV3hKgLIpnWUP0=; b=Oo7fK2C9QrqVtAQ2ps9lAEVRiqiIcFDBGMKwZN5EHBXpBnxO0zls1qUh3j65iUBmjF ELXyyFKtpOp47/qYfZ5KAsUGTAAN1IJu/hycfTQJU+DK5Q1sx0BoEqPjaAN1L46nkaNb gJfyRGWvEZhB4zeGB0u1YV5aSYBOHEtBy7HGu4HrnBg8EWdnYnsKlddA2bezP581umLy dKmYld+rHstXbKIIJ2nLgPZisZ6lGIiN4upoJyRFTfKQBJBRVY2IQOG1yab/F29LF1vZ BZi8cZYVOBwb24IdQBcPCq/jCUtu2GzZ2f7qFO4eZLqPXpEEfPcftUMyNdoXgZJrnkgR RJcQ== X-Gm-Message-State: AOAM532EnyigoQiEm6SgaeobSHj6bNCi8yF2s1mhriR+twu3AA3Ch4+b BBO0rqHiSpBCIMFR9gRo8N0= X-Google-Smtp-Source: ABdhPJwJmbvLBh+3CPJsZMpWHwDOm7ssnvbH/2hLKA2+FanZgecX8snhkCEohHkZU2sXDaathoDogg== X-Received: by 2002:a05:622a:14ca:: with SMTP id u10mr12743277qtx.280.1622994608031; Sun, 06 Jun 2021 08:50:08 -0700 (PDT) Original-Received: from localhost (pool-71-190-212-171.nycmny.fios.verizon.net. [71.190.212.171]) by smtp.gmail.com with ESMTPSA id r4sm652269qtx.4.2021.06.06.08.50.07 (version=TLS1_2 cipher=ECDHE-ECDSA-CHACHA20-POLY1305 bits=256/256); Sun, 06 Jun 2021 08:50:07 -0700 (PDT) X-Google-Original-From: 36609@debbugs.gnu.org In-Reply-To: <87muhks3b5.fsf@hochschule-trier.de> (Andreas Politz's message of "Thu, 11 Jul 2019 22:51:10 +0200") X-BeenThere: debbugs-submit@debbugs.gnu.org X-Mailman-Version: 2.1.18 Precedence: list X-BeenThere: bug-gnu-emacs@gnu.org List-Id: "Bug reports for GNU Emacs, the Swiss army knife of text editors" List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: bug-gnu-emacs-bounces+geb-bug-gnu-emacs=m.gmane-mx.org@gnu.org Original-Sender: "bug-gnu-emacs" Xref: news.gmane.io gmane.emacs.bugs:208136 Archived-At: --=-=-= Content-Type: text/plain Four attachments: #1. Want to revert commit 9c62ffb --=-=-= Content-Type: text/x-diff Content-Disposition: attachment; filename=0001-Revert-Fix-lock-failures-in-xg_select.patch >From 01b8b847f5fc4b65eaac06c7c2d85f8c5410c497 Mon Sep 17 00:00:00 2001 From: dickmao Date: Sun, 6 Jun 2021 11:10:13 -0400 Subject: [PATCH] Revert "Fix lock failures in xg_select" This reverts commit 9c62ffb08262c82b7e38e6eb5767f2087424aa47. * src/thread.c (really_call_select): revert 9c62ffb * src/xgselect.c (xg_select): revert 9c62ffb --- src/thread.c | 8 -------- src/xgselect.c | 42 ++++++++++++++---------------------------- src/xgselect.h | 2 -- 3 files changed, 14 insertions(+), 38 deletions(-) diff --git a/src/thread.c b/src/thread.c index f74f611148..609cd7c5fc 100644 --- a/src/thread.c +++ b/src/thread.c @@ -28,12 +28,6 @@ Copyright (C) 2012-2021 Free Software Foundation, Inc. #include "pdumper.h" #include "keyboard.h" -#if defined HAVE_GLIB && ! defined (HAVE_NS) -#include -#else -#define release_select_lock() do { } while (0) -#endif - union aligned_thread_state { struct thread_state s; @@ -592,8 +586,6 @@ really_call_select (void *arg) sa->result = (sa->func) (sa->max_fds, sa->rfds, sa->wfds, sa->efds, sa->timeout, sa->sigmask); - release_select_lock (); - block_interrupt_signal (&oldset); /* If we were interrupted by C-g while inside sa->func above, the signal handler could have called maybe_reacquire_global_lock, in diff --git a/src/xgselect.c b/src/xgselect.c index 0d91d55bad..d7c63e3be1 100644 --- a/src/xgselect.c +++ b/src/xgselect.c @@ -29,27 +29,6 @@ Copyright (C) 2009-2021 Free Software Foundation, Inc. #include "blockinput.h" #include "systime.h" -static ptrdiff_t threads_holding_glib_lock; -static GMainContext *glib_main_context; - -void release_select_lock (void) -{ - if (--threads_holding_glib_lock == 0) - g_main_context_release (glib_main_context); -} - -static void acquire_select_lock (GMainContext *context) -{ - if (threads_holding_glib_lock++ == 0) - { - glib_main_context = context; - while (!g_main_context_acquire (context)) - { - /* Spin. */ - } - } -} - /* `xg_select' is a `pselect' replacement. Why do we need a separate function? 1. Timeouts. Glib and Gtk rely on timer events. If we did pselect with a greater timeout then the one scheduled by Glib, we would @@ -75,19 +54,26 @@ xg_select (int fds_lim, fd_set *rfds, fd_set *wfds, fd_set *efds, GPollFD *gfds = gfds_buf; int gfds_size = ARRAYELTS (gfds_buf); int n_gfds, retval = 0, our_fds = 0, max_fds = fds_lim - 1; + bool context_acquired = false; int i, nfds, tmo_in_millisec, must_free = 0; bool need_to_dispatch; context = g_main_context_default (); - acquire_select_lock (context); + context_acquired = g_main_context_acquire (context); + /* FIXME: If we couldn't acquire the context, we just silently proceed + because this function handles more than just glib file descriptors. + Note that, as implemented, this failure is completely silent: there is + no feedback to the caller. */ if (rfds) all_rfds = *rfds; else FD_ZERO (&all_rfds); if (wfds) all_wfds = *wfds; else FD_ZERO (&all_wfds); - n_gfds = g_main_context_query (context, G_PRIORITY_LOW, &tmo_in_millisec, - gfds, gfds_size); + n_gfds = (context_acquired + ? g_main_context_query (context, G_PRIORITY_LOW, &tmo_in_millisec, + gfds, gfds_size) + : -1); if (gfds_size < n_gfds) { @@ -165,10 +151,8 @@ xg_select (int fds_lim, fd_set *rfds, fd_set *wfds, fd_set *efds, #else need_to_dispatch = true; #endif - if (need_to_dispatch) + if (need_to_dispatch && context_acquired) { - acquire_select_lock (context); - int pselect_errno = errno; /* Prevent g_main_dispatch recursion, that would occur without block_input wrapper, because event handlers call @@ -178,9 +162,11 @@ xg_select (int fds_lim, fd_set *rfds, fd_set *wfds, fd_set *efds, g_main_context_dispatch (context); unblock_input (); errno = pselect_errno; - release_select_lock (); } + if (context_acquired) + g_main_context_release (context); + /* To not have to recalculate timeout, return like this. */ if ((our_fds > 0 || (nfds == 0 && tmop == &tmo)) && (retval == 0)) { diff --git a/src/xgselect.h b/src/xgselect.h index 2142a236b2..e00dce1283 100644 --- a/src/xgselect.h +++ b/src/xgselect.h @@ -29,6 +29,4 @@ #define XGSELECT_H fd_set *rfds, fd_set *wfds, fd_set *efds, struct timespec *timeout, sigset_t *sigmask); -extern void release_select_lock (void); - #endif /* XGSELECT_H */ -- 2.26.2 --=-=-= Content-Type: text/plain #2. Fails on tip of master, succeeds after patch in #1. --=-=-= Content-Type: application/emacs-lisp Content-Disposition: attachment; filename=42.el Content-Transfer-Encoding: quoted-printable ;; -*- lexical-binding: t -*- (require 'threads) (require 'eieio) (require 'cl-lib) (require 'ring) (defun debug (fmt &rest args) (princ (apply #'format fmt args) #'external-debugging-output) (terpri #'external-debugging-output)) (define-error 'thread-utils-thread-interrupted "Thread was interrupted" 'error) (defun thread-utils-main-thread-p (&optional object) (let ((object (or object (current-thread)))) (and (threadp object) (eq object (car (all-threads)))))) (defun thread-utils-quittable-apply (fn &rest args) (let* ((this-thread (current-thread)) (quit-thread (make-thread (lambda nil (condition-case nil (cl-loop (sleep-for 3)) (quit (thread-signal this-thread 'quit nil)) (thread-utils-thread-interrupted nil)))))) (unwind-protect (apply fn args) (thread-signal quit-thread 'thread-utils-thread-interrupted nil)))) (defun thread-utils-condition-quittable-wait (condition) (cl-check-type condition condition-variable) (thread-utils-quittable-apply #'condition-wait condition)) (defun thread-utils-condition-wait (condition) (if (thread-utils-main-thread-p) (thread-utils-condition-quittable-wait condition) (condition-wait condition))) (defclass channel-terminal nil ((mutex :initarg :mutex :type mutex) (condition :initarg :condition :type condition-variable) (msg-queue :initarg :msg-queue :type ring) (closed-p :initform nil) (other-terminal :type (or null channel-terminal)))) (defclass channel-source (channel-terminal) nil) (defclass channel-sink (channel-terminal) nil) (define-error 'channel-closed "Trying to send/recv from a closed channel") (defun make-channel (capacity) (let* ((mutex (make-mutex "channel")) (condition (make-condition-variable mutex "channel")) (msg-queue (make-ring capacity)) (source (channel-source :mutex mutex :condition condition :msg-queue msg-queue)) (sink (channel-sink :mutex mutex :condition condition :msg-queue msg-queue))) (oset source other-terminal sink) (oset sink other-terminal source) (cons source sink))) (cl-defgeneric channel-send ((source channel-source) message) (with-mutex (oref source mutex) (with-slots (condition msg-queue) source (while (and (not (channel-closed-p source)) (=3D (ring-size msg-queue) (ring-length msg-queue))) (thread-utils-condition-wait condition)) (when (channel-closed-p source) (signal 'channel-closed (list source))) (let ((inhibit-quit t)) (ring-insert msg-queue message) (when (=3D 1 (ring-length msg-queue)) (condition-notify condition t))) nil))) (cl-defgeneric channel-recv ((sink channel-terminal)) (with-mutex (oref sink mutex) (with-slots (condition msg-queue) sink (while (and (not (channel-closed-p sink)) (ring-empty-p msg-queue)) (thread-utils-condition-wait condition)) (when (channel-closed-p sink) (signal 'channel-closed (list sink))) (let ((inhibit-quit t)) (prog1 (ring-remove msg-queue) (when (=3D 1 (- (ring-size msg-queue) (ring-length msg-queue))) (condition-notify condition t))))))) (cl-defgeneric channel-peek ((sink channel-terminal)) (with-mutex (oref sink mutex) (with-slots (condition msg-queue) sink (while (and (not (channel-closed-p sink)) (ring-empty-p msg-queue)) (thread-utils-condition-wait condition)) (when (channel-closed-p sink) (signal 'channel-closed (list sink))) (ring-ref msg-queue -1)))) (cl-defgeneric channel-close ((terminal channel-terminal)) (with-mutex (oref terminal mutex) (with-slots (closed-p condition) terminal (setq closed-p t) (condition-notify condition t)) nil)) (cl-defmethod channel-closed-p ((source channel-source)) (with-mutex (oref source mutex) (with-slots (closed-p other-terminal) source (or closed-p (oref other-terminal closed-p))))) (cl-defmethod channel-closed-p ((sink channel-sink)) (with-mutex (oref sink mutex) (with-slots (closed-p other-terminal msg-queue) sink (or closed-p (and (oref other-terminal closed-p) (ring-empty-p msg-queue)))))) (defmacro start-thread (name what) `(make-thread (lambda () (condition-case err (progn (sleep-for (+ 2 (random 3))) (funcall ,what)) (channel-closed (message "wtf %s: %s" ,name (error-message-string err))))) ,name)) (let ((n 3)) (cl-destructuring-bind (source . sink) (make-channel 1) (dotimes (i n) (let ((send-name (format "send-%d" (1+ i))) (recv-name (format "recv-%d" (- n i)))) (start-thread send-name (lambda () (channel-send source (format "data-%d" i)) (debug "%s: sent %s" send-name (format "data-%d" i)))) (start-thread recv-name (lambda () (when-let ((ret (channel-recv sink))) (debug "%s: recv %s" recv-name ret)))))) (start-thread "clear" (lambda () (while (> (length (cl-remove-if (lambda (thr) (eq (current-thread thr))) (all-threads))) 1) (accept-process-output nil 0.5)) (channel-close source) (channel-close sink))))) (ignore-errors (enable-command 'list-threads)) (call-interactively #'list-threads) --=-=-= Content-Type: text/plain #3. Abridged original example, fails after patch in #1. --=-=-= Content-Type: application/emacs-lisp Content-Disposition: attachment; filename=report.el Content-Transfer-Encoding: quoted-printable ;; -*- lexical-binding: t -*- (require 'threads) (require 'eieio) (require 'cl-lib) (require 'ring) (define-error 'thread-utils-thread-interrupted "Thread was interrupted" 'error) (defun thread-utils-main-thread-p (&optional object) (let ((object (or object (current-thread)))) (and (threadp object) (eq object (car (all-threads)))))) (defun thread-utils-quittable-apply (fn &rest args) (let* ((this-thread (current-thread)) (quit-thread (make-thread (lambda nil (condition-case nil (cl-loop (sleep-for 3)) (quit (thread-signal this-thread 'quit nil)) (thread-utils-thread-interrupted nil)))))) (unwind-protect (apply fn args) (thread-signal quit-thread 'thread-utils-thread-interrupted nil)))) (defun thread-utils-condition-quittable-wait (condition) (cl-check-type condition condition-variable) (thread-utils-quittable-apply #'condition-wait condition)) (defun thread-utils-condition-wait (condition) (if (thread-utils-main-thread-p) (thread-utils-condition-quittable-wait condition) (condition-wait condition))) (defconst channel-default-capacity 16) (defclass channel-terminal nil ((mutex :initarg :mutex :type mutex) (condition :initarg :condition :type condition-variable) (msg-queue :initarg :msg-queue :type ring) (closed-p :initform nil) (other-terminal :type (or null channel-terminal)))) (defclass channel-source (channel-terminal) nil) (defclass channel-sink (channel-terminal) nil) (define-error 'channel-closed "Trying to send/recv from a closed channel") (defun make-channel (&optional capacity) (unless capacity (setq capacity channel-default-capacity)) (cl-check-type capacity (integer 1 *)) (let* ((mutex (make-mutex "channel")) (condition (make-condition-variable mutex "channel")) (msg-queue (make-ring capacity)) (source (channel-source :mutex mutex :condition condition :msg-queue msg-queue)) (sink (channel-sink :mutex mutex :condition condition :msg-queue msg-queue))) (oset source other-terminal sink) (oset sink other-terminal source) (cons source sink))) (cl-defgeneric channel-send ((source channel-source) message) (with-mutex (oref source mutex) (with-slots (condition msg-queue) source (while (and (not (channel-closed-p source)) (=3D (ring-size msg-queue) (ring-length msg-queue))) (thread-utils-condition-wait condition)) (when (channel-closed-p source) (signal 'channel-closed (list source))) (let ((inhibit-quit t)) (ring-insert msg-queue message) (when (=3D 1 (ring-length msg-queue)) (condition-notify condition t))) nil))) (cl-defgeneric channel-recv ((sink channel-terminal)) (with-mutex (oref sink mutex) (with-slots (condition msg-queue) sink (while (and (not (channel-closed-p sink)) (ring-empty-p msg-queue)) (thread-utils-condition-wait condition)) (when (channel-closed-p sink) (signal 'channel-closed (list sink))) (let ((inhibit-quit t)) (prog1 (ring-remove msg-queue) (when (=3D 1 (- (ring-size msg-queue) (ring-length msg-queue))) (condition-notify condition t))))))) (cl-defgeneric channel-peek ((sink channel-terminal)) (with-mutex (oref sink mutex) (with-slots (condition msg-queue) sink (while (and (not (channel-closed-p sink)) (ring-empty-p msg-queue)) (thread-utils-condition-wait condition)) (when (channel-closed-p sink) (signal 'channel-closed (list sink))) (ring-ref msg-queue -1)))) (cl-defgeneric channel-close ((terminal channel-terminal)) (with-mutex (oref terminal mutex) (with-slots (closed-p condition) terminal (setq closed-p t) (condition-notify condition t)) nil)) (cl-defmethod channel-closed-p ((source channel-source)) (with-mutex (oref source mutex) (with-slots (closed-p other-terminal) source (or closed-p (oref other-terminal closed-p))))) (cl-defmethod channel-closed-p ((sink channel-sink)) (with-mutex (oref sink mutex) (with-slots (closed-p other-terminal msg-queue) sink (or closed-p (and (oref other-terminal closed-p) (ring-empty-p msg-queue)))))) (let ((channel (make-channel 1))) (make-thread (lambda nil (channel-send (car channel) 42)) "produce") (channel-recv (cdr channel)) (ignore-errors (enable-command 'list-threads)) (call-interactively #'list-threads)) --=-=-= Content-Type: text/plain Fails not necessarily because xgselect.c is wrong, but rather because channel-recv blocks on a mutex before channel-send can get its act together. This was hard for all to discern because OP seemed to have gone out of his way to obfuscate his "minimum" example. #4. What #3 probably intended, succeeds after patch in #1. --=-=-= Content-Type: application/emacs-lisp Content-Disposition: attachment; filename=report-2.el Content-Transfer-Encoding: quoted-printable ;; -*- lexical-binding: t -*- (require 'threads) (require 'eieio) (require 'cl-lib) (require 'ring) (define-error 'thread-utils-thread-interrupted "Thread was interrupted" 'error) (defun thread-utils-main-thread-p (&optional object) (let ((object (or object (current-thread)))) (and (threadp object) (eq object (car (all-threads)))))) (defun thread-utils-quittable-apply (fn &rest args) (let* ((this-thread (current-thread)) (quit-thread (make-thread (lambda nil (condition-case nil (cl-loop (sleep-for 3)) (quit (thread-signal this-thread 'quit nil)) (thread-utils-thread-interrupted nil)))))) (unwind-protect (apply fn args) (thread-signal quit-thread 'thread-utils-thread-interrupted nil)))) (defun thread-utils-condition-quittable-wait (condition) (cl-check-type condition condition-variable) (thread-utils-quittable-apply #'condition-wait condition)) (defun thread-utils-condition-wait (condition) (if (thread-utils-main-thread-p) (thread-utils-condition-quittable-wait condition) (condition-wait condition))) (defconst channel-default-capacity 16) (defclass channel-terminal nil ((mutex :initarg :mutex :type mutex) (condition :initarg :condition :type condition-variable) (msg-queue :initarg :msg-queue :type ring) (closed-p :initform nil) (other-terminal :type (or null channel-terminal)))) (defclass channel-source (channel-terminal) nil) (defclass channel-sink (channel-terminal) nil) (define-error 'channel-closed "Trying to send/recv from a closed channel") (defun make-channel (&optional capacity) (unless capacity (setq capacity channel-default-capacity)) (cl-check-type capacity (integer 1 *)) (let* ((mutex (make-mutex "channel")) (condition (make-condition-variable mutex "channel")) (msg-queue (make-ring capacity)) (source (channel-source :mutex mutex :condition condition :msg-queue msg-queue)) (sink (channel-sink :mutex mutex :condition condition :msg-queue msg-queue))) (oset source other-terminal sink) (oset sink other-terminal source) (cons source sink))) (cl-defgeneric channel-send ((source channel-source) message) (with-mutex (oref source mutex) (with-slots (condition msg-queue) source (while (and (not (channel-closed-p source)) (=3D (ring-size msg-queue) (ring-length msg-queue))) (thread-utils-condition-wait condition)) (when (channel-closed-p source) (signal 'channel-closed (list source))) (let ((inhibit-quit t)) (ring-insert msg-queue message) (when (=3D 1 (ring-length msg-queue)) (condition-notify condition t))) nil))) (cl-defgeneric channel-recv ((sink channel-terminal)) (with-mutex (oref sink mutex) (with-slots (condition msg-queue) sink (while (and (not (channel-closed-p sink)) (ring-empty-p msg-queue)) (thread-utils-condition-wait condition)) (when (channel-closed-p sink) (signal 'channel-closed (list sink))) (let ((inhibit-quit t)) (prog1 (ring-remove msg-queue) (when (=3D 1 (- (ring-size msg-queue) (ring-length msg-queue))) (condition-notify condition t))))))) (cl-defgeneric channel-peek ((sink channel-terminal)) (with-mutex (oref sink mutex) (with-slots (condition msg-queue) sink (while (and (not (channel-closed-p sink)) (ring-empty-p msg-queue)) (thread-utils-condition-wait condition)) (when (channel-closed-p sink) (signal 'channel-closed (list sink))) (ring-ref msg-queue -1)))) (cl-defgeneric channel-close ((terminal channel-terminal)) (with-mutex (oref terminal mutex) (with-slots (closed-p condition) terminal (setq closed-p t) (condition-notify condition t)) nil)) (cl-defmethod channel-closed-p ((source channel-source)) (with-mutex (oref source mutex) (with-slots (closed-p other-terminal) source (or closed-p (oref other-terminal closed-p))))) (cl-defmethod channel-closed-p ((sink channel-sink)) (with-mutex (oref sink mutex) (with-slots (closed-p other-terminal msg-queue) sink (or closed-p (and (oref other-terminal closed-p) (ring-empty-p msg-queue)))))) (let ((channel (make-channel 1))) (make-thread (lambda nil (channel-send (car channel) 42)) "produce") (make-thread (lambda nil (sleep-for 2) (channel-recv (cdr channel))) "consume") (ignore-errors (enable-command 'list-threads)) (call-interactively #'list-threads)) --=-=-=--