English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Анализ механизма сообщения Android 6.0

Сообщения хранятся в одном из очередей сообщений, а сообщение циклической нити окружает эту очередь сообщений, до тех пор, пока нить не выйдет из цикла. Если в очереди есть сообщения, сообщение циклической нити будет их извлекать и направлять соответствующему обработчику для обработки; если в очереди нет сообщений, сообщение циклической нити перейдет в состояние ожидания,等待下一个消息的到来。При разработке приложений для Android, когда задачи выполняемые приложением становятся слишком сложными, чтобы не блокировать основную нить UI и не вызвать ANR, мы обычно создаем поднить для выполнения специфических задач. При создании поднити есть два варианта: один из них - это создание поднити без циклической нити, создавая объект Thread; другой вариант - это создание поднити с циклической нитью, и создание поднити с циклической нитью возможно двумя способами: один из них - это использование класса HandlerThread, предоставленного Android, для создания объекта нити с циклической нитью, а другой способ - это запуск циклической нити в методе run() нити с использованием следующего способа: 

1. Механизм сообщений 

Обычно сообщения состоят из одной的消息 потока и хендлера,让我们 посмотрим на сообщение хендлер PowerManagerService:        

 mHandlerThread = new ServiceThread(TAG,
        Process.THREAD_PRIORITY_DISPLAY, false /*allowIo*/);
    mHandlerThread.start();
    mHandler = new PowerManagerHandler(mHandlerThread.getLooper()); 

здесь ServiceThread - это HandlerThread, при создании хендлера необходимо передать лупер HandlerThread, в противном случае используется лупер текущего потока. 

а каждый хендлер, примерно так:

   private final class PowerManagerHandler extends Handler {
    public PowerManagerHandler(Looper looper) {
      super(looper, null, true /*async*/);
    }
    @Override
    public void handleMessage(Message msg) {
      switch (msg.what) {
        case MSG_USER_ACTIVITY_TIMEOUT:
          handleUserActivityTimeout();
          break;
        case MSG_SANDMAN:
          handleSandman();
          break;
        case MSG_SCREEN_BRIGHTNESS_BOOST_TIMEOUT:
          handleScreenBrightnessBoostTimeout();
          break;
        case MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT:
          checkWakeLockAquireTooLong();
          Message m = mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT);
          m.setAsynchronous(true);
          mHandler.sendMessageDelayed(m, WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT);
          break;
      }
    }
  }

二、消息机制原理
那我们先来看下HandlerThread的主函数run函数: 

 public void run() {
    mTid = Process.myTid();
    Looper.prepare();
    synchronized (this) {
      mLooper = Looper.myLooper();//赋值后notifyall,主要是getLooper函数返回的是mLooper
      notifyAll();
    }
    Process.setThreadPriority(mPriority);
    onLooperPrepared();
    Looper.loop();
    mTid = -1;
  }

再来看看Lopper的prepare函数,最后新建了一个Looper对象,并且放在线程的局部变量中。

public static void prepare() {
    prepare(true);
  }
  private static void prepare(boolean quitAllowed) {
    if (sThreadLocal.get() != null) {
      throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
  } 

Looper的构造函数中创建了MessageQueue

   private Looper(boolean quitAllowed) {
    mQueue = new MessageQueue(quitAllowed);
    mThread = Thread.currentThread();
  } 

我们又来看一下MessageQueue的构造函数,其中nativeInit是一个native方法,并且把返回值保存在mPtr显然是用long型变量保存的指针

MessageQueue(boolean quitAllowed) {
    mQuitAllowed = quitAllowed;
    mPtr = nativeInit();
  } 

В native функции создается объект NativeMessageQueue и возвращает переменную указателя.

 static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
  NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
  if (!nativeMessageQueue) {
    jniThrowRuntimeException(env, "Unable to allocate native queue");
    return 0;
  }
  nativeMessageQueue->incStrong(env);
  return reinterpret_cast<jlong>(nativeMessageQueue);
} 

Конструктор NativeMessageQueue��取mLooper, если его нет, то создает новый Looper 

NativeMessageQueue::NativeMessageQueue() :
    mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
  mLooper = Looper::getForThread();
  if (mLooper == NULL) {
    mLooper = new Looper(false);
    Looper::setForThread(mLooper);
  }
}

Затем посмотрим на конструктор Looper, который показывает, что был вызван eventfd для создания fd, eventfd主要用于进程或线程间的通信, можно посмотреть на эту статью о介绍 eventfd

 Looper::Looper(bool allowNonCallbacks) :
    mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
    mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),}
    mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
  mWakeEventFd = eventfd(0, EFD_NONBLOCK);
  LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd. errno=%d", errno);
  AutoMutex _l(mLock);
  rebuildEpollLocked();
}

2.1 Create epoll in c layer 

Let's take a look at the rebuildEpollLocked function, which creates an epoll, adds mWakeEventFd to epoll, and also adds mRequests's fd to epoll

 void Looper::rebuildEpollLocked() {
  // Close old epoll instance if we have one.
  if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
    ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
    close(mEpollFd);
  }
  // Allocate the new epoll instance and register the wake pipe.
  mEpollFd = epoll_create(EPOLL_SIZE_HINT);
  LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
  struct epoll_event eventItem;
  memset(&eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
  eventItem.events = EPOLLIN;
  eventItem.data.fd = mWakeEventFd;
  int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, &eventItem);
  LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance. errno=%d",
      errno);
  for (size_t i = 0; i < mRequests.size(); i++) {
    const Request& request = mRequests.valueAt(i);
    struct epoll_event eventItem;
    request.initEventItem(&eventItem);
    int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
    if (epollResult < 0) {
      ALOGE("Error adding epoll events for fd %d while rebuilding epoll set, errno=%d",
          request.fd, errno);
    }
  }
} 

Продолжаем возвращаться в функцию run HandlerThread, и продолжаем анализировать функцию loop Looper

public void run() {
    mTid = Process.myTid();
    Looper.prepare();
    synchronized (this) {
      mLooper = Looper.myLooper();
      notifyAll();
    }
    Process.setThreadPriority(mPriority);
    onLooperPrepared();
    Looper.loop();
    mTid = -1;
  } 

Позвольте посмотрим на функцию loop Looper:

public static void loop() {
    final Looper me = myLooper();
    if (me == null) {
      throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
    }
    final MessageQueue queue = me.mQueue; // Получаем mQueue Looper
    // Убедитесь, что идентификация этой нити соответствует локальному процессу,
    // и отслеживать, что на самом деле такое идентификационный токен.
    Binder.clearCallingIdentity();
    final long ident = Binder.clearCallingIdentity();
    for (;;) {
      Message msg = queue.next(); // может блокировать, блокировка в основном epoll_wait
      if (msg == null) {
        // No message indicates that the message queue is quitting.
        return;
      }
      // This must be in a local variable, in case a UI event sets the logger
      Printer logging = me.mLogging; // сделано само собой
      if (logging != null) {
        logging.println(">>>>> Dispatching to " + msg.target + " "+
            msg.callback + ": " + msg.what);
      }
      msg.target.dispatchMessage(msg);
      if (logging != null) {
        logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
      }
      // Make sure that during the course of dispatching the
      // identity of the thread wasn't corrupted.
      final long newIdent = Binder.clearCallingIdentity();
      if (ident != newIdent) {
        Log.wtf(TAG, "Thread identity changed from 0x"
            + Long.toHexString(ident) + " to 0x"
            + Long.toHexString(newIdent) + " while dispatching to ",
            + msg.target.getClass().getName() + " ",
            + msg.callback + " what=" + msg.what);
      }
      msg.recycleUnchecked();
    }
  }

Функция next класса MessageQueue主要是 вызывает.nativePollOnce функцию, а затем извлекает Message из очереди сообщений

Message next() {
    // Возврат к этому месту, если сообщение loop уже quit и было disposed.
    // Это может произойти, если приложение пытается перезапустить looper после quit
    // которая не поддерживается.
    final long ptr = mPtr; // сохраненный ранее указатель
    if (ptr == 0) {
      return null;
    }
    int pendingIdleHandlerCount = -1; // -1 только во время первой итерации
    int nextPollTimeoutMillis = 0;
    for (;;) {
      if (nextPollTimeoutMillis != 0) {
        Binder.flushPendingCommands();
      }
      nativePollOnce(ptr, nextPollTimeoutMillis); 

Ниже мы рассмотрим.nativePollOnce этой.native функции, принудительно преобразовав предыдущий указатель в NativeMessageQueue, а затем вызывая его функцию pollOnce

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
    jlong ptr, jint timeoutMillis) {
  NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
  nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

2.2 блокировка epoll_wait на уровне c 

Функция pollOnce,一般在函数前面的while通常没有,只是处理了indent больше 0的情况,这种情况通常没有,所以我们 можем сразу смотреть на функцию pollInner

 int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
  int result = 0;
  for (;;) {
    while (mResponseIndex < mResponses.size()) {
      const Response& response = mResponses.itemAt(mResponseIndex++);
      int ident = response.request.ident;
      if (ident >= 0) {
        int fd = response.request.fd;
        int events = response.events;
        void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - возвращаемый идентификатор сигнала %d: "
            "fd=%d, events=0x%x, data=%p",
            this, ident, fd, events, data);
#endif
        if (outFd != NULL) *outFd = fd;
        if (outEvents != NULL) *outEvents = events;
        if (outData != NULL) *outData = data;
        return ident;
      }
    }
    if (result != 0) {
#if DEBUG_POLL_AND_WAKE
      ALOGD("%p ~ pollOnce - возвращаемый результат %d", this, result);
#endif
      if (outFd != NULL) *outFd = 0;
      if (outEvents != NULL) *outEvents = 0;
      if (outData != NULL) *outData = NULL;
      return result;
    }
    result = pollInner(timeoutMillis);
  }
} 

Функция pollInner主要负责调用epoll_wait в режиме блокировки, и java-слой будет вычислять время блокировки每次调用并将其传到 c-слою, ожидая的到来mWakeEventFd или события, поступающие от fd, добавленных ранее, после чего epoll_wait вернется. 

int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
  ALOGD("%p ~ pollOnce - ждем: timeoutMillis=%d", this, timeoutMillis);
#endif
  // Корректируем время ожидания на основе времени следующего сообщения.
  if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
    nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
    int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
    if (messageTimeoutMillis >= 0
        && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
      timeoutMillis = messageTimeoutMillis;
    }
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - следующее сообщение через %" PRId64 "ns, откорректированное время ожидания: timeoutMillis=%d",
        this, mNextMessageUptime - now, timeoutMillis);
#endif
  }
  // Poll.
  int result = POLL_WAKE;
  mResponses.clear(); // Очищаем mResponses
  mResponseIndex = 0;
  // Мы вот-вот простаиваем.
  mPolling = true;
  struct epoll_event eventItems[EPOLL_MAX_EVENTS];
  int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // epoll_wait блокируется в основной THREAD, время блокировки также передается из JAVA слоя
  // Дольше не простаивает.
  mPolling = false;
  // Получить блок.
  mLock.lock();
  // Перестроить установку epoll, если нужно.
  if (mEpollRebuildRequired) {
    mEpollRebuildRequired = false;
    rebuildEpollLocked();
    goto Done;
  }
  // Проверитьошибку poll.
  if (eventCount < 0) {
    if (errno == EINTR) {
      goto Done;
    }
    ALOGW("Poll не удалось с неожиданной ошибкой, errno=%d", errno);
    result = POLL_ERROR;
    goto Done;
  }
  // Проверитьtimeout poll.
  if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - timeout", this);
#endif
    result = POLL_TIMEOUT;
    goto Done;
  }
  // Обработать все события.
#if DEBUG_POLL_AND_WAKE
  ALOGD("%p ~ pollOnce - обработка событий от %d fds", this, eventCount);
#endif
  for (int i = 0; i < eventCount; i++) {
    int fd = eventItems[i].data.fd;
    uint32_t epollEvents = eventItems[i].events;
    if (fd == mWakeEventFd) { // Событие уведомления о пробуждении потока
      if (epollEvents & EPOLLIN) {
        awoken();
      } else {
        ALOGW("Игнорирование неожиданных событий epoll 0x%x при событии пробуждения fd.", epollEvents);
      }
    } else {
      ssize_t requestIndex = mRequests.indexOfKey(fd); // Предыдущие события addFd
      if (requestIndex >= 0) {
        int events = 0;
        if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
        if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
        if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
        if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
        pushResponse(events, mRequests.valueAt(requestIndex)); // Поместить в mResponses
      } else {
        ALOGW("Пропуск意外的 epoll событий 0x%x на fd %d, что "
            "дольше не зарегистрирован", epollEvents, fd);
      }
    }
  }
Done: ;
  // Вызов вызовов回调 сообщений.
  mNextMessageUptime = LLONG_MAX;
  while (mMessageEnvelopes.size() != 0) { // This section mainly deals with C-level messages, while Java-level messages are managed independently
    nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
    const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
    if (messageEnvelope.uptime <= now) {
      // Remove the envelope from the list.
      // We keep a strong reference to the handler until the call to handleMessage
      // finishes. Then we drop it so that the handler can be deleted *before*
      // we reacquire our lock.
      { // obtain handler
        sp<MessageHandler> handler = messageEnvelope.handler;
        Message message = messageEnvelope.message;
        mMessageEnvelopes.removeAt(0);
        mSendingMessage = true;
        mLock.unlock();
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
        ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
            this, handler.get(), message.what);
#endif
        handler->handleMessage(message);
      } // освободить обработчик
      mLock.lock();
      mSendingMessage = false;
      result = POLL_CALLBACK;
    } else {
      // Последнее сообщение, оставшееся в голове очереди, определяет следующее время пробуждения.
      mNextMessageUptime = messageEnvelope.uptime;
      break;
    }
  }
  // Release lock.
  mLock.unlock();
  // Invoke all response callbacks.
  for (size_t i = 0; i < mResponses.size(); i++) { // This is the handling of the previous addFd event, mainly iterating over mResponses and calling its callback
    Response& response = mResponses.editItemAt(i);
    if (response.request.ident == POLL_CALLBACK) {
      int fd = response.request.fd;
      int events = response.events;
      void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
      ALOGD("%p ~ pollOnce - вызов кэббека события fd %p: fd=%d, events=0x%x, data=%p",
          this, response.request.callback.get(), fd, events, data);
#endif
      // Вызов обратного вызова. Обратите внимание, что дескриптор файла может быть закрыт
      // обратный вызов (и возможно даже повторно используемый) до возвращения функции так
      // нужно быть немного внимательным при удалении дескриптора файла afterwards.
      int callbackResult = response.request.callback->handleEvent(fd, events, data);
      if (callbackResult == 0) {
        removeFd(fd, response.request.seq);
      }
      // Срочно удалить ссылку на обратный вызов в структуре ответа, так как мы
      // не очищать сам вектор ответа до следующего опроса.
      response.request.callback.clear();
      result = POLL_CALLBACK;
    }
  }
  return result;
} 

Continue analyzing the loop function of Looper, you can add your own print to debug the code, previously calling Message's target's dispatchMessage to distribute messages

     for (;;) {
      Message msg = queue.next(); // might block
      if (msg == null) {
        // No message indicates that the message queue is quitting.
        return;
      }
      // This must be in a local variable, in case a UI event sets the logger
      Printer logging = me.mLogging;//свой принтер
      if (logging != null) {
        logging.println(">>>>> Dispatching to " + msg.target + " "+
            msg.callback + ": " + msg.what);
      }
      msg.target.dispatchMessage(msg);
      if (logging != null) {
        logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
      }
      // Make sure that during the course of dispatching the
      // identity of the thread wasn't corrupted.
      final long newIdent = Binder.clearCallingIdentity();
      if (ident != newIdent) {
        Log.wtf(TAG, "Thread identity changed from 0x"
            + Long.toHexString(ident) + " to 0x"
            + Long.toHexString(newIdent) + " while dispatching to ",
            + msg.target.getClass().getName() + " ",
            + msg.callback + " what=" + msg.what);
      }
      msg.recycleUnchecked();
    }
  }

2.3 Добавление отладочной печати 

Давайте сначала посмотрим, как можно добавить печать, это можно сделать через функцию setMessageLogging класса Lopper

public void setMessageLogging(@Nullable Printer printer) {
    mLogging = printer;
  } 
Printer - это interface
public interface Printer {
  /**
   * Записать строку текста в вывод. Не нужно заканчивать
   * Введенная строка с новым строкой.
   */
  void println(String x);
}

2.4 Обработка分发 сообщений в层面 java 

Теперь рассмотрим分发 сообщений, сначала вызывается функция obtainMessage класса Handler               

 Message msg = mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT);
 msg.setAsynchronous(true);
 mHandler.sendMessageDelayed(msg, WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT); 

Сначала посмотрим, что функция obtainMessage вызывает функцию obtain класса Message

public final Message obtainMessage(int what)
  {
    return Message.obtain(this, what);
  } 

Функция obtain класса Message создает новый Message, затем его target устанавливается в Handler

public static Message obtain(Handler h, int what) {
    Message m = obtain();//это создание нового Message
    m.target = h;
    m.what = what;
    return m;
  }

Давайте рассмотрим предыдущую рассылку сообщений 

msg.target.dispatchMessage(msg);в конце это вызывается функцией dispatchMessage Handler, в Handler в конце будет обработано сообщение в зависимости от различных ситуаций.

   public void dispatchMessage(Message msg) {
    if (msg.callback != null) {
      handleCallback(msg);//это отправка с помощью post, сRunnable
    } else {
      if (mCallback != null) {//это когда handler передается параметром mCallback с обратным вызовом
        if (mCallback.handleMessage(msg)) {
          return;
        }
      }
      handleMessage(msg);//в конце это обработается в реализованном нами handleMessage
    }
  }

2.3 Отправка сообщений в java-уровне 

Давайте рассмотрим отправку сообщений в java-уровне, которая также в основном вызывает функции sendMessage post Handler, в конечном итоге вызывая эту функцию

   public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
    MessageQueue queue = mQueue;
    if (queue == null) {
      RuntimeException e = new RuntimeException(
          this + " sendMessageAtTime() called with no mQueue");
      Log.w("Looper", e.getMessage(), e);
      return false;
    }
    return enqueueMessage(queue, msg, uptimeMillis);
  } 

Давайте рассмотрим, что в java-уровне отправка сообщений в конечном итоге вызывает функцию enqueueMessage

private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
    msg.target = this;
    if (mAsynchronous) {
      msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis);
  } 

В конце концов, в enqueueMessage добавьте сообщение в очередь сообщений, а затем, если нужно, вызовите функцию nativeWake в层的 C

boolean enqueueMessage(Message msg, long when) {
    if (msg.target == null) {
      throw new IllegalArgumentException("Message must have a target.");
    }
    if (msg.isInUse()) {
      throw new IllegalStateException(msg + " This message is already in use.");
    }
    synchronized (this) {
      if (mQuitting) {
        IllegalStateException e = new IllegalStateException(
            msg.target + " sending message to a Handler on a dead thread");
        Log.w(TAG, e.getMessage(), e);
        msg.recycle();
        return false;
      }
      msg.markInUse();
      msg.when = when;
      Message p = mMessages;
      boolean needWake;
      if (p == null || when == 0 || when < p.when) {
        // Новая голова, если она заблокирована, разбудите очередь событий
        msg.next = p;
        mMessages = msg;
        needWake = mBlocked;
      } else {
        // Вставлен в середину очереди. Обычно мы не должны будить
        // Вверх по очереди событий, если только в голове очереди нет барьера
        // и это сообщение является earliest асинхронным сообщением в очереди.
        needWake = mBlocked && p.target == null && msg.isAsynchronous();
        Message prev;
        for (;;) {
          prev = p;
          p = p.next;
          if (p == null || when < p.when) {
            break;
          }
          if (needWake && p.isAsynchronous()) {
            needWake = false;
          }
        }
        msg.next = p; // постулат: p == prev.next
        prev.next = msg;
      }
      // Мы можем предположить, что mPtr != 0, потому что mQuitting равен false.
      if (needWake) {
        nativeWake(mPtr);
      }
    }
    return true;
  } 

Посмотрим на этот native метод, в конечном итоге он также вызывает функцию wake Looper

 static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
  NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
  nativeMessageQueue->wake();
}
void NativeMessageQueue::wake() {
  mLooper->wake();
} 

Looper класса wake выполняет функцию записи содержимого в mWakeEventfd, этот fd используется только для уведомления, как pipe, в конечном итоге онбудет вызывать epoll_wait, что предотвратит блокирование потока, и线程 продолжит отправлять сообщения c-уровня, а затем обработать события, добавленные в fd, и обработать сообщения java-уровня. 

void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
  ALOGD("%p ~ wake", this);
#endif
  uint64_t inc = 1;
  ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
  if (nWrite != sizeof(uint64_t)) {
    if (errno != EAGAIN) {}}
      ALOGW("Не удалось записать сигнал пробуждения, errno=%d", errno);
    }
  }
}

2.4 Отправка сообщения в c-слое 

В c-слое также можно отправлять сообщения, в основном это вызов функции sendMessageAtTime Looper, параметр handler является回调ом, мы помещаем сообщение в mMessageEnvelopes.

 void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
    const Message& message) {
#if DEBUG_CALLBACKS
  ALOGD("%p ~ sendMessageAtTime - uptime=%" PRId64 ", handler=%p, what=%d",
      this, uptime, handler.get(), message.what);
#endif
  size_t i = 0;
  { // acquire lock
    AutoMutex _l(mLock);
    size_t messageCount = mMessageEnvelopes.size();
    while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
      i += 1;
    }
    MessageEnvelope messageEnvelope(uptime, handler, message);
    mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
    // Оптимизация: Если Looper в настоящее время отправляет сообщение, то мы можем пропустить
    // вызов wake() потому что следующим делом Looper будет после обработки
    // сообщения предназначены для определения времени следующего пробуждения. На самом деле, это
    // не имеет значения, запущен ли этот код на потоке Looper.
    if (mSendingMessage) {
      return;
    }
  }
  // Wake the poll loop only when we enqueue a new message at the head.
  if (i == 0) {
    wake();
  }
} 

When in pollOnce, after epoll_wait, it will iterate through the messages in mMessageEnvelopes and then call the handleMessage function of its handler

   while (mMessageEnvelopes.size() != 0) {
    nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
    const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
    if (messageEnvelope.uptime <= now) {
      // Remove the envelope from the list.
      // We keep a strong reference to the handler until the call to handleMessage
      // finishes. Then we drop it so that the handler can be deleted *before*
      // we reacquire our lock.
      { // obtain handler
        sp<MessageHandler> handler = messageEnvelope.handler;
        Message message = messageEnvelope.message;
        mMessageEnvelopes.removeAt(0);
        mSendingMessage = true;
        mLock.unlock();
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
        ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
            this, handler.get(), message.what);
#endif
        handler->handleMessage(message);
      } // освободить обработчик
      mLock.lock();
      mSendingMessage = false;
      result = POLL_CALLBACK;
    } else {
      // Последнее сообщение, оставшееся в голове очереди, определяет следующее время пробуждения.
      mNextMessageUptime = messageEnvelope.uptime;
      break;
    }
  } 

В файле Looper_test.cpp介绍了很多Looper的使用方法, давайте рассмотрим

   sp<StubMessageHandler> handler = new StubMessageHandler();
  mLooper->sendMessageAtTime(now + ms2ns(100), handler, Message(MSG_TEST1)); 
StubMessageHandler наследуется от MessageHandler и должен реализовать метод handleMessage
class StubMessageHandler : public MessageHandler {
public:
  Vector<Message> messages;
  virtual void handleMessage(const Message& message) {
    messages.push(message);
  }
}; 

Мы также рассмотрим классы Message и MessageHandler

 struct Message {
  Message() : what(0) { }
  Message(int what) : what(what) { }
  /* Тип сообщения. (толкование оставляется на усмотрение обработчика) */
  int what;
};
/**
 * Интерфейс для обработчика сообщений Looper.
 *
 * Looper сохраняет сильную ссылку на обработчик сообщений, когда у него есть
 * сообщение для доставки. Убедитесь, что вы вызвали Looper::removeMessages
 To remove any pending messages destined for the handler so that the handler
 Can be destroyed.
 */
class MessageHandler : public virtual RefBase {
protected:
  virtual ~MessageHandler() { }
public:
  /**
   Handles a message.
   */
  virtual void handleMessage(const Message& message) = 0;
};

2.5 c layer addFd 

We can also add fd to the epoll thread in addFd of Looper.cpp, and when fd has data, we can also handle the corresponding data. Let's first look at the addFd function, and we notice that there is a callBack callback

 int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
  return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data);
}
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
#if DEBUG_CALLBACKS
  ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
      events, callback.get(), data);
#endif
  if (!callback.get()) {
    if (! mAllowNonCallbacks) {
      ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
      return -1;
    }
    if (ident < 0) {
      ALOGE("Invalid attempt to set NULL callback with ident < 0.");
      return -1;
    }
  } else {
    ident = POLL_CALLBACK;
  }
  { // acquire lock
    AutoMutex _l(mLock);
    Request request;
    request.fd = fd;
    request.ident = ident;
    request.events = events;
    request.seq = ++mNextRequestSeq;
    request.callback = callback;
    request.data = data;
    if (mNextRequestSeq == -1) mNextRequestSeq = 0; // зарезервировать номер последовательности -1
    struct epoll_event eventItem;
    request.initEventItem(&eventItem);
    ssize_t requestIndex = mRequests.indexOfKey(fd);
    if (requestIndex < 0) {
      int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem); // добавить в epoll
      if (epollResult < 0) {
        ALOGE("Ошибка добавления epoll событий для fd %d, errno=%d", fd, errno);
        return -1;
      }
      mRequests.add(fd, request); // добавить в mRequests
    } else {
      int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem); // обновление
      if (epollResult < 0) {
        if (errno == ENOENT) {
          // Терпение ENOENT, так как это означает, что был создан более старый дескриптор файла
          // закрыт до того, как был unregister, и в то же время был создан новый
          // был создан дескриптор файла с таким же номером и теперь
          // зарегистрирован впервые. Эта ошибка может возникнуть естественным образом
          // когда обратный вызов имеет побочный эффект закрытия дескриптора файла
          // перед тем как вернуться и.unregister.
          // проверяет, что задача добросовестна.
          //}}
          // Unfortunately due to kernel limitations we need to rebuild the epoll
          // set from scratch because it may contain an old file handle that we are
          // now unable to remove since its file descriptor is no longer valid.
          // No such problem would have occurred if we were using the poll system
          // call instead, but that approach carries others disadvantages.
#if DEBUG_CALLBACKS
          ALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor "
              "being recycled, falling back on EPOLL_CTL_ADD, errno=%d",
              this, errno);
#endif
          epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
          if (epollResult < 0) {
            ALOGE("Error modifying or adding epoll events for fd %d, errno=%d",
                fd, errno);
            return -1;
          }
          scheduleEpollRebuildLocked();
        } else {
          ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
          return -1;
        }
      }
      mRequests.replaceValueAt(requestIndex, request);
    }
  }
  return 1;
} 

В функции pollOnce мы сначала находим соответствующий fd в mRequests, затем в pushResponse создаем новый Response и затем совмещаем Response с Request.

     } else {
      ssize_t requestIndex = mRequests.indexOfKey(fd);
      if (requestIndex >= 0) {
        int events = 0;
        if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
        if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
        if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
        if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
        pushResponse(events, mRequests.valueAt(requestIndex));
      } else {
        ALOGW("Пропуск意外的 epoll событий 0x%x на fd %d, что "
            "дольше не зарегистрирован", epollEvents, fd);
      }
    } 

Мы遍历mResponses и вызываем кэббек в request

   for (size_t i = 0; i < mResponses.size(); i++) {
    Response& response = mResponses.editItemAt(i);
    if (response.request.ident == POLL_CALLBACK) {
      int fd = response.request.fd;
      int events = response.events;
      void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
      ALOGD("%p ~ pollOnce - вызов кэббека события fd %p: fd=%d, events=0x%x, data=%p",
          this, response.request.callback.get(), fd, events, data);
#endif
      // Вызов обратного вызова. Обратите внимание, что дескриптор файла может быть закрыт
      // обратный вызов (и возможно даже повторно используемый) до возвращения функции так
      // нужно быть немного внимательным при удалении дескриптора файла afterwards.
      int callbackResult = response.request.callback->handleEvent(fd, events, data);
      if (callbackResult == 0) {
        removeFd(fd, response.request.seq);
      }
      // Срочно удалить ссылку на обратный вызов в структуре ответа, так как мы
      // не очищать сам вектор ответа до следующего опроса.
      response.request.callback.clear();
      result = POLL_CALLBACK;
    }
  } 

Давайте также посмотрим, как используется Looper_test.cpp?

   Pipe pipe;
  StubCallbackHandler handler(true);
  handler.setCallback(mLooper, pipe.receiveFd, Looper::EVENT_INPUT); 

Посмотрим на функцию setCallback класса handler

class CallbackHandler {
public:
  void setCallback(const sp<Looper>& looper, int fd, int events) {
    looper->addFd(fd, 0, events, staticHandler, this); // это вызов функции addFd looper с обратным вызовом
  }
protected:
  virtual ~CallbackHandler() { }
  virtual int handler(int fd, int events) = 0;
private:
  static int staticHandler(int fd, int events, void* data) { // Это обратный вызов функции
    return static_cast<CallbackHandler*>(data)->handler(fd, events);
  }
};
class StubCallbackHandler : public CallbackHandler {
public:
  int nextResult;
  int callbackCount;
  int fd;
  int events;
  StubCallbackHandler(int nextResult) : nextResult(nextResult),
      callbackCount(0), fd(-1), events(-1) {
  }
protected:
  virtual int handler(int fd, int events) { // Это вызывается через обратный вызов функции
    callbackCount += 1;
    this->fd = fd;
    this->events = events;
    return nextResult;
  }
}; 

Давайте рассмотрим addFd Looper вместе, когда callback есть, мы создаем новый SimpleLooperCallback

 int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
  return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data);
} 

Здесь Looper_callbackFunc является typedef
typedef int (*Looper_callbackFunc)(int fd, int events, void* data);

Новый взгляд на SimpleLooperCallback

 class SimpleLooperCallback : public LooperCallback {
protected:
  virtual ~SimpleLooperCallback();
public:
  SimpleLooperCallback(Looper_callbackFunc callback);
  virtual int handleEvent(int fd, int events, void* data);
private:
  Looper_callbackFunc mCallback;
};SimpleLooperCallback::SimpleLooperCallback(Looper_callbackFunc callback) :
    mCallback(callback) {
}
SimpleLooperCallback::~SimpleLooperCallback() {
}
int SimpleLooperCallback::handleEvent(int fd, int events, void* data) {
  return mCallback(fd, events, data);
} 

В конечном итоге мы вызываем callback->handleEvent(fd, events, data), где callback это SimpleLooperCallback, а data это переданный ранее указатель на CallbackHandler
 Таким образом, в конечном итоге вызывается staticHandler, а data->handler, это this->handler, и в конечном итоге вызывается функция handler в StubCallbackHandler 

Конечно, мы можем обойтись и без такой сложности, просто использовать вторую функцию addFd, и, конечно, нам нужно будет самим определить класс для callBack, чтобы реализовать LooperCallBack, что значительно упростит процесс
 int addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data);

2.6 Добавление fd в java-слое 

Долгое время считал, что можно добавить fd только в Looper c-слоя, но оказывается, что это можно сделать и в java-слое через JNI 

Мы можем реализовать эту функцию через addOnFileDescriptorEventListener в MessageQueue

   public void addOnFileDescriptorEventListener(@NonNull FileDescriptor fd,)
      @OnFileDescriptorEventListener.Events int events,
      @NonNull OnFileDescriptorEventListener listener) {
    if (fd == null) {
      throw new IllegalArgumentException("fd must not be null");
    }
    if (listener == null) {
      throw new IllegalArgumentException("listener must not be null");
    }
    synchronized (this) {
      updateOnFileDescriptorEventListenerLocked(fd, events, listener);
    }
  }

назовем OnFileDescriptorEventListener этот回调

   public interface OnFileDescriptorEventListener {
    public static final int EVENT_INPUT = 1 << 0;
    public static final int EVENT_OUTPUT = 1 << 1;
    public static final int EVENT_ERROR = 1 << 2;
    /** @hide */
    @Retention(RetentionPolicy.SOURCE)
    @IntDef(flag=true, value={EVENT_INPUT, EVENT_OUTPUT, EVENT_ERROR})
    public @interface Events {}
    @Events int onFileDescriptorEvents(@NonNull FileDescriptor fd, @Events int events);
  }

затем был вызван updateOnFileDescriptorEventListenerLocked функцией

 private void updateOnFileDescriptorEventListenerLocked(FileDescriptor fd, int events,
      OnFileDescriptorEventListener listener) {
    final int fdNum = fd.getInt$();
    int index = -1;
    FileDescriptorRecord record = null;
    if (mFileDescriptorRecords != null) {
      index = mFileDescriptorRecords.indexOfKey(fdNum);
      if (index >= 0) {
        record = mFileDescriptorRecords.valueAt(index);
        if (record != null && record.mEvents == events) {
          return;
        }
      }
    }
    if (events != 0) {
      events |= OnFileDescriptorEventListener.EVENT_ERROR;
      if (record == null) {
        if (mFileDescriptorRecords == null) {
          mFileDescriptorRecords = new SparseArray<FileDescriptorRecord>();
        }
        record = new FileDescriptorRecord(fd, events, listener);// fd сохраняется в объекте FileDescriptorRecord
        mFileDescriptorRecords.put(fdNum, record);// mFileDescriptorRecords затем сохраняется
      } else {
        record.mListener = listener;
        record.mEvents = events;
        record.mSeq += 1;
      }
      nativeSetFileDescriptorEvents(mPtr, fdNum, events);// вызов native функции
    } else if (record != null) {
      record.mEvents = 0;
      mFileDescriptorRecords.removeAt(index);
    }
  } 

native в конце вызвал функцию setFileDescriptorEvents класса NativeMessageQueue 

static void android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz,
    jlong ptr, jint fd, jint events) {
  NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
  nativeMessageQueue->setFileDescriptorEvents(fd, events);
}

функция setFileDescriptorEvents, эта addFd является вторым вызовом addFd, поэтому мы можем быть уверены, что NativeMessageQueue наследует LooperCallback

 void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) {
  if (events) {
    int looperEvents = 0;
    if (events & CALLBACK_EVENT_INPUT) {
      looperEvents |= Looper::EVENT_INPUT;
    }
    if (events & CALLBACK_EVENT_OUTPUT) {
      looperEvents |= Looper::EVENT_OUTPUT;
    }
    mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this,
        reinterpret_cast<void*>(events));
  } else {
    mLooper->removeFd(fd);
  }
}

Конечно, нужно реализовать функцию handleEvent

 class NativeMessageQueue : public MessageQueue, public LooperCallback {
public:
  NativeMessageQueue();
  virtual ~NativeMessageQueue();
  virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj);
  void pollOnce(JNIEnv* env, jobject obj, int timeoutMillis);
  void wake();
  void setFileDescriptorEvents(int fd, int events);
  virtual int handleEvent(int fd, int events, void* data);

handleEvent это функция, которая вызывается в looper после epoll_wait, когда у нас есть данные на добавленном fd

 int NativeMessageQueue::handleEvent(int fd, int looperEvents, void* data) {
  int events = 0;
  if (looperEvents & Looper::EVENT_INPUT) {
    events |= CALLBACK_EVENT_INPUT;
  }
  if (looperEvents & Looper::EVENT_OUTPUT) {
    events |= CALLBACK_EVENT_OUTPUT;
  }
  if (looperEvents & (Looper::EVENT_ERROR | Looper::EVENT_HANGUP | Looper::EVENT_INVALID)) {
    events |= CALLBACK_EVENT_ERROR;
  }
  int oldWatchedEvents = reinterpret_cast<intptr_t>(data);
  int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj,
      gMessageQueueClassInfo.dispatchEvents, fd, events); // вызов回调
  if (!newWatchedEvents) {
    return 0; // unregister the fd
  }
  if (newWatchedEvents != oldWatchedEvents) {
    setFileDescriptorEvents(fd, newWatchedEvents);
  }
  return 1;
}

В конце dispatchEvents в java MessageQueue на самом деле это вызов обратно на JNI-уровень, а затем вызов зарегистрированных回调-функций

// Вызвано из.native кода.
  private int dispatchEvents(int fd, int events) {
    // Получить запись дескриптора файла и любое состояние, которое может измениться.
    final FileDescriptorRecord record;
    final int oldWatchedEvents;
    final OnFileDescriptorEventListener listener;
    final int seq;
    synchronized (this) {
      record = mFileDescriptorRecords.get(fd);// через fd получить FileDescriptorRecord 
      if (record == null) {
        return 0; // ложные, не зарегистрирован слушатель
      }
      oldWatchedEvents = record.mEvents;
      events &= oldWatchedEvents; // фильтровать события на основе текущего набора наблюдаемых событий
      if (events == 0) {
        return oldWatchedEvents; // ложные, изменились наблюдаемые события
      }
      listener = record.mListener;
      seq = record.mSeq;
    }
    // Вызов слушателя вне блокировки.
    int newWatchedEvents = listener.onFileDescriptorEvents(//listener回调
        record.mDescriptor, events);
    if (newWatchedEvents != 0) {
      newWatchedEvents |= OnFileDescriptorEventListener.EVENT_ERROR;
    }
    // Обновить запись дескриптора файла, если слушатель изменил набор
    // наблюдаемые события и сам слушатель не обновлялись с тех пор.
    if (newWatchedEvents != oldWatchedEvents) {
      synchronized (this) {
        int index = mFileDescriptorRecords.indexOfKey(fd);
        if (index >= 0 && mFileDescriptorRecords.valueAt(index) == record
            && record.mSeq == seq) {
          record.mEvents = newWatchedEvents;
          if (newWatchedEvents == 0) {
            mFileDescriptorRecords.removeAt(index);
          }
        }
      }
    }
    // Возврат нового набора событий для.native, чтобы заняться этим.
    return newWatchedEvents;
  }

Вот весь контент статьи, мы надеемся, что он поможет вам в изучении, и我们也 надеемся на вашу поддержку и поддержку учебника Yana.

Заявление: контент этой статьи взят из Интернета, авторские права принадлежат соответствующему автору. Контент предоставлен пользователями Интернета, самостоятельно загружен, сайт не имеет права собственности, не был обработан вручную и не несет ответственности за соответствующие юридические последствия. Если вы обнаружите спорный контент, пожалуйста, отправьте письмо по адресу: notice#oldtoolbag.com (при отправке письма, пожалуйста, замените # на @) для сообщения о нарушении,并提供 соответствующие доказательства. При подтверждении правонарушения сайт немедленно удалит спорный контент.

Вам может понравиться