package com.gojek.mqtt.client;

import com.gojek.courier.extensions.TimeUnitExtensionsKt;
import com.gojek.courier.logging.ILogger;
import com.gojek.courier.utils.Clock;
import com.gojek.mqtt.client.listener.MessageListener;
import com.gojek.mqtt.event.EventHandler;
import com.gojek.mqtt.event.MqttEvent;
import com.gojek.mqtt.exception.CourierExceptionKt;
import com.gojek.mqtt.persistence.IMqttReceivePersistence;
import com.gojek.mqtt.persistence.model.MqttReceivePacket;
import com.gojek.mqtt.persistence.model.MqttReceivePacketKt;
import com.gojek.mqtt.utils.MqttUtils;
import com.google.android.gms.common.internal.ServiceSpecificExtraArgs;
import io.sentry.SentryEvent;
import j$.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.internal.Intrinsics;

/* compiled from: IncomingMsgControllerImpl.kt */
@Metadata(bv = {}, d1 = {"\u0000\u008c\u0001\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u000b\n\u0002\b\u0002\n\u0002\u0010\u000e\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\t\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0000\n\u0002\u0010 \n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\b\b\u0000\u0018\u0000 72\u00020\u0001:\u0003879B?\u0012\u0006\u0010\u0010\u001a\u00020\u000f\u0012\u0006\u0010\u0013\u001a\u00020\u0012\u0012\u0006\u0010\u0016\u001a\u00020\u0015\u0012\u0006\u0010\u0019\u001a\u00020\u0018\u0012\u0006\u0010\u001c\u001a\u00020\u001b\u0012\u0006\u0010\u001e\u001a\u00020\u001b\u0012\u0006\u0010 \u001a\u00020\u001f¢\u0006\u0004\b5\u00106J\b\u0010\u0003\u001a\u00020\u0002H\u0002J\u0010\u0010\u0007\u001a\u00020\u00062\u0006\u0010\u0005\u001a\u00020\u0004H\u0002J\b\u0010\b\u001a\u00020\u0002H\u0016J\u0018\u0010\r\u001a\u00020\u00022\u0006\u0010\n\u001a\u00020\t2\u0006\u0010\f\u001a\u00020\u000bH\u0016J\u0018\u0010\u000e\u001a\u00020\u00022\u0006\u0010\n\u001a\u00020\t2\u0006\u0010\f\u001a\u00020\u000bH\u0016R\u0014\u0010\u0010\u001a\u00020\u000f8\u0002X\u0082\u0004¢\u0006\u0006\n\u0004\b\u0010\u0010\u0011R\u0014\u0010\u0013\u001a\u00020\u00128\u0002X\u0082\u0004¢\u0006\u0006\n\u0004\b\u0013\u0010\u0014R\u0014\u0010\u0016\u001a\u00020\u00158\u0002X\u0082\u0004¢\u0006\u0006\n\u0004\b\u0016\u0010\u0017R\u0014\u0010\u0019\u001a\u00020\u00188\u0002X\u0082\u0004¢\u0006\u0006\n\u0004\b\u0019\u0010\u001aR\u0014\u0010\u001c\u001a\u00020\u001b8\u0002X\u0082\u0004¢\u0006\u0006\n\u0004\b\u001c\u0010\u001dR\u0014\u0010\u001e\u001a\u00020\u001b8\u0002X\u0082\u0004¢\u0006\u0006\n\u0004\b\u001e\u0010\u001dR\u0014\u0010 \u001a\u00020\u001f8\u0002X\u0082\u0004¢\u0006\u0006\n\u0004\b \u0010!R\u0014\u0010#\u001a\u00020\"8\u0002X\u0082\u0004¢\u0006\u0006\n\u0004\b#\u0010$R\u0014\u0010&\u001a\u00020%8\u0002X\u0082\u0004¢\u0006\u0006\n\u0004\b&\u0010'R\u0018\u0010)\u001a\u00060(R\u00020\u00008\u0002X\u0082\u0004¢\u0006\u0006\n\u0004\b)\u0010*R\u0018\u0010,\u001a\u00060+R\u00020\u00008\u0002X\u0082\u0004¢\u0006\u0006\n\u0004\b,\u0010-R&\u00100\u001a\u0014\u0012\u0004\u0012\u00020\t\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u000b0/0.8\u0002X\u0082\u0004¢\u0006\u0006\n\u0004\b0\u00101R\u001c\u00103\u001a\b\u0012\u0002\b\u0003\u0018\u0001028\u0002@\u0002X\u0082\u000e¢\u0006\u0006\n\u0004\b3\u00104¨\u0006:"}, d2 = {"Lcom/gojek/mqtt/client/IncomingMsgControllerImpl;", "Lcom/gojek/mqtt/client/IncomingMsgController;", "", "scheduleMessagesCleanup", "Lcom/gojek/mqtt/persistence/model/MqttReceivePacket;", "message", "", "notifyListeners", "triggerHandleMessage", "", "topic", "Lcom/gojek/mqtt/client/listener/MessageListener;", ServiceSpecificExtraArgs.CastExtraArgs.LISTENER, "registerListener", "unregisterListener", "Lcom/gojek/mqtt/utils/MqttUtils;", "mqttUtils", "Lcom/gojek/mqtt/utils/MqttUtils;", "Lcom/gojek/mqtt/persistence/IMqttReceivePersistence;", "mqttReceivePersistence", "Lcom/gojek/mqtt/persistence/IMqttReceivePersistence;", "Lcom/gojek/courier/logging/ILogger;", SentryEvent.JsonKeys.LOGGER, "Lcom/gojek/courier/logging/ILogger;", "Lcom/gojek/mqtt/event/EventHandler;", "eventHandler", "Lcom/gojek/mqtt/event/EventHandler;", "", "ttlSeconds", "J", "cleanupIntervalSeconds", "Lcom/gojek/courier/utils/Clock;", "clock", "Lcom/gojek/courier/utils/Clock;", "Ljava/util/concurrent/ThreadPoolExecutor;", "handleMsgThreadPool", "Ljava/util/concurrent/ThreadPoolExecutor;", "Ljava/util/concurrent/ScheduledThreadPoolExecutor;", "cleanupThreadPool", "Ljava/util/concurrent/ScheduledThreadPoolExecutor;", "Lcom/gojek/mqtt/client/IncomingMsgControllerImpl$HandleMessage;", "handleMessageTrigger", "Lcom/gojek/mqtt/client/IncomingMsgControllerImpl$HandleMessage;", "Lcom/gojek/mqtt/client/IncomingMsgControllerImpl$CleanupExpiredMessages;", "cleanupMessagesTrigger", "Lcom/gojek/mqtt/client/IncomingMsgControllerImpl$CleanupExpiredMessages;", "j$/util/concurrent/ConcurrentHashMap", "", "listenerMap", "Lj$/util/concurrent/ConcurrentHashMap;", "Ljava/util/concurrent/ScheduledFuture;", "cleanupFuture", "Ljava/util/concurrent/ScheduledFuture;", "<init>", "(Lcom/gojek/mqtt/utils/MqttUtils;Lcom/gojek/mqtt/persistence/IMqttReceivePersistence;Lcom/gojek/courier/logging/ILogger;Lcom/gojek/mqtt/event/EventHandler;JJLcom/gojek/courier/utils/Clock;)V", "Companion", "CleanupExpiredMessages", "HandleMessage", "mqtt-client_release"}, k = 1, mv = {1, 5, 1})
/* loaded from: classes3.dex */
public final class IncomingMsgControllerImpl implements IncomingMsgController {
    public static final String TAG = "IncomingMsgController";
    private ScheduledFuture<?> cleanupFuture;
    private final long cleanupIntervalSeconds;
    private final CleanupExpiredMessages cleanupMessagesTrigger;
    private final ScheduledThreadPoolExecutor cleanupThreadPool;
    private final Clock clock;
    private final EventHandler eventHandler;
    private final HandleMessage handleMessageTrigger;
    private final ThreadPoolExecutor handleMsgThreadPool;
    private final ConcurrentHashMap<String, List<MessageListener>> listenerMap;
    private final ILogger logger;
    private final IMqttReceivePersistence mqttReceivePersistence;
    private final MqttUtils mqttUtils;
    private final long ttlSeconds;

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: IncomingMsgControllerImpl.kt */
    @Metadata(d1 = {"\u0000\u0012\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0002\n\u0000\b\u0082\u0004\u0018\u00002\u00020\u0001B\u0005¢\u0006\u0002\u0010\u0002J\b\u0010\u0003\u001a\u00020\u0004H\u0016¨\u0006\u0005"}, d2 = {"Lcom/gojek/mqtt/client/IncomingMsgControllerImpl$CleanupExpiredMessages;", "Ljava/lang/Runnable;", "(Lcom/gojek/mqtt/client/IncomingMsgControllerImpl;)V", "run", "", "mqtt-client_release"}, k = 1, mv = {1, 5, 1}, xi = 48)
    /* loaded from: classes3.dex */
    public final class CleanupExpiredMessages implements Runnable {
        final /* synthetic */ IncomingMsgControllerImpl this$0;

        public CleanupExpiredMessages(IncomingMsgControllerImpl this$0) {
            Intrinsics.checkNotNullParameter(this$0, "this$0");
            this.this$0 = this$0;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.this$0.logger.d(IncomingMsgControllerImpl.TAG, "Deleting expired messages");
            int removeMessagesWithOlderTimestamp = this.this$0.mqttReceivePersistence.removeMessagesWithOlderTimestamp(this.this$0.clock.nanoTime() - TimeUnitExtensionsKt.fromSecondsToNanos(this.this$0.ttlSeconds));
            this.this$0.logger.d(IncomingMsgControllerImpl.TAG, "Deleted " + removeMessagesWithOlderTimestamp + " expired messages");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: IncomingMsgControllerImpl.kt */
    @Metadata(d1 = {"\u0000\"\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\b\n\u0000\n\u0002\u0010 \n\u0002\u0010\t\n\u0000\n\u0002\u0010\u0002\n\u0000\b\u0082\u0004\u0018\u00002\u00020\u0001B\u0005¢\u0006\u0002\u0010\u0002J\u0016\u0010\u0003\u001a\u00020\u00042\f\u0010\u0005\u001a\b\u0012\u0004\u0012\u00020\u00070\u0006H\u0002J\b\u0010\b\u001a\u00020\tH\u0016¨\u0006\n"}, d2 = {"Lcom/gojek/mqtt/client/IncomingMsgControllerImpl$HandleMessage;", "Ljava/lang/Runnable;", "(Lcom/gojek/mqtt/client/IncomingMsgControllerImpl;)V", "deleteMessages", "", "messageIds", "", "", "run", "", "mqtt-client_release"}, k = 1, mv = {1, 5, 1}, xi = 48)
    /* loaded from: classes3.dex */
    public final class HandleMessage implements Runnable {
        final /* synthetic */ IncomingMsgControllerImpl this$0;

        public HandleMessage(IncomingMsgControllerImpl this$0) {
            Intrinsics.checkNotNullParameter(this$0, "this$0");
            this.this$0 = this$0;
        }

        private final int deleteMessages(List<Long> messageIds) {
            return this.this$0.mqttReceivePersistence.removeReceivedMessages(messageIds);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (this.this$0.listenerMap.keySet().isEmpty()) {
                    this.this$0.logger.d(IncomingMsgControllerImpl.TAG, "No listeners registered");
                } else {
                    IMqttReceivePersistence iMqttReceivePersistence = this.this$0.mqttReceivePersistence;
                    Set keySet = this.this$0.listenerMap.keySet();
                    Intrinsics.checkNotNullExpressionValue(keySet, "listenerMap.keys");
                    List<MqttReceivePacket> allIncomingMessagesWithTopicFilter = iMqttReceivePersistence.getAllIncomingMessagesWithTopicFilter(keySet);
                    if (!this.this$0.mqttUtils.isEmpty(allIncomingMessagesWithTopicFilter)) {
                        ArrayList arrayList = new ArrayList();
                        for (MqttReceivePacket mqttReceivePacket : allIncomingMessagesWithTopicFilter) {
                            this.this$0.logger.d(IncomingMsgControllerImpl.TAG, Intrinsics.stringPlus("Going to process ", Long.valueOf(mqttReceivePacket.getMessageId())));
                            if (this.this$0.notifyListeners(mqttReceivePacket)) {
                                arrayList.add(Long.valueOf(mqttReceivePacket.getMessageId()));
                            }
                            this.this$0.logger.d(IncomingMsgControllerImpl.TAG, Intrinsics.stringPlus("Successfully Processed Message ", Long.valueOf(mqttReceivePacket.getMessageId())));
                        }
                        if (!arrayList.isEmpty()) {
                            int deleteMessages = deleteMessages(arrayList);
                            this.this$0.logger.d(IncomingMsgControllerImpl.TAG, "Deleted " + deleteMessages + " messages");
                        }
                        return;
                    }
                    this.this$0.logger.d(IncomingMsgControllerImpl.TAG, "No Messages in Table");
                }
            } finally {
                this.this$0.scheduleMessagesCleanup();
            }
        }
    }

    public IncomingMsgControllerImpl(MqttUtils mqttUtils, IMqttReceivePersistence mqttReceivePersistence, ILogger logger, EventHandler eventHandler, long j, long j2, Clock clock) {
        Intrinsics.checkNotNullParameter(mqttUtils, "mqttUtils");
        Intrinsics.checkNotNullParameter(mqttReceivePersistence, "mqttReceivePersistence");
        Intrinsics.checkNotNullParameter(logger, "logger");
        Intrinsics.checkNotNullParameter(eventHandler, "eventHandler");
        Intrinsics.checkNotNullParameter(clock, "clock");
        this.mqttUtils = mqttUtils;
        this.mqttReceivePersistence = mqttReceivePersistence;
        this.logger = logger;
        this.eventHandler = eventHandler;
        this.ttlSeconds = j;
        this.cleanupIntervalSeconds = j2;
        this.clock = clock;
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(1), mqttUtils.threadFactory("msg-store", false));
        threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        Unit unit = Unit.INSTANCE;
        this.handleMsgThreadPool = threadPoolExecutor;
        this.cleanupThreadPool = new ScheduledThreadPoolExecutor(1, mqttUtils.threadFactory("msg-store-cleanup", false), new ThreadPoolExecutor.DiscardPolicy());
        this.handleMessageTrigger = new HandleMessage(this);
        this.cleanupMessagesTrigger = new CleanupExpiredMessages(this);
        this.listenerMap = new ConcurrentHashMap();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final boolean notifyListeners(MqttReceivePacket message) {
        boolean z = false;
        try {
            Object obj = this.listenerMap.get(message.getTopic());
            Intrinsics.checkNotNull(obj);
            Intrinsics.checkNotNullExpressionValue(obj, "listenerMap[message.topic]!!");
            Iterator it = ((Iterable) obj).iterator();
            while (it.hasNext()) {
                z = true;
                ((MessageListener) it.next()).onMessageReceived(MqttReceivePacketKt.toMqttMessage(message));
            }
            return z;
        } catch (Throwable th) {
            this.logger.d(TAG, Intrinsics.stringPlus("Exception while processing message ", th));
            this.eventHandler.onEvent(new MqttEvent.MqttMessageReceiveErrorEvent(message.getTopic(), message.getMessage().length, CourierExceptionKt.toCourierException(th), null, 8, null));
            return z;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void scheduleMessagesCleanup() {
        ScheduledFuture<?> scheduledFuture = this.cleanupFuture;
        if (scheduledFuture != null) {
            scheduledFuture.cancel(false);
        }
        this.cleanupFuture = this.cleanupThreadPool.schedule(this.cleanupMessagesTrigger, this.cleanupIntervalSeconds, TimeUnit.SECONDS);
    }

    @Override // com.gojek.mqtt.client.IncomingMsgController
    public synchronized void registerListener(String topic, MessageListener listener) {
        Intrinsics.checkNotNullParameter(topic, "topic");
        Intrinsics.checkNotNullParameter(listener, "listener");
        ConcurrentHashMap<String, List<MessageListener>> concurrentHashMap = this.listenerMap;
        ConcurrentHashMap<String, List<MessageListener>> concurrentHashMap2 = concurrentHashMap;
        List list = (List) concurrentHashMap.get(topic);
        if (list == null) {
            list = CollectionsKt.emptyList();
        }
        concurrentHashMap2.put(topic, CollectionsKt.plus((Collection<? extends MessageListener>) list, listener));
        triggerHandleMessage();
    }

    @Override // com.gojek.mqtt.client.IncomingMsgController
    public void triggerHandleMessage() {
        this.handleMsgThreadPool.submit(this.handleMessageTrigger);
    }

    @Override // com.gojek.mqtt.client.IncomingMsgController
    public synchronized void unregisterListener(String topic, MessageListener listener) {
        Intrinsics.checkNotNullParameter(topic, "topic");
        Intrinsics.checkNotNullParameter(listener, "listener");
        ConcurrentHashMap<String, List<MessageListener>> concurrentHashMap = this.listenerMap;
        ConcurrentHashMap<String, List<MessageListener>> concurrentHashMap2 = concurrentHashMap;
        List list = (List) concurrentHashMap.get(topic);
        if (list == null) {
            list = CollectionsKt.emptyList();
        }
        concurrentHashMap2.put(topic, CollectionsKt.minus(list, listener));
        Object obj = this.listenerMap.get(topic);
        Intrinsics.checkNotNull(obj);
        if (((List) obj).isEmpty()) {
            this.listenerMap.remove(topic);
        }
    }
}
