package com.primeton.pmq.broker.region.virtual;

import com.primeton.pmq.broker.Broker;
import com.primeton.pmq.broker.BrokerService;
import com.primeton.pmq.broker.ConnectionContext;
import com.primeton.pmq.broker.ProducerBrokerExchange;
import com.primeton.pmq.broker.region.Destination;
import com.primeton.pmq.broker.region.DestinationFilter;
import com.primeton.pmq.broker.region.Topic;
import com.primeton.pmq.command.ConnectionId;
import com.primeton.pmq.command.LocalTransactionId;
import com.primeton.pmq.command.Message;
import com.primeton.pmq.command.PMQDestination;
import com.primeton.pmq.command.PMQQueue;
import com.primeton.pmq.util.LRUCache;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:com/primeton/pmq/broker/region/virtual/VirtualTopicInterceptor.class */
public class VirtualTopicInterceptor extends DestinationFilter {
    private final String prefix;
    private final String postfix;
    private final boolean local;
    private final boolean concurrentSend;
    private final boolean transactedSend;
    private final LRUCache<PMQDestination, PMQQueue> cache;

    public VirtualTopicInterceptor(Destination destination, VirtualTopic virtualTopic) {
        super(destination);
        this.cache = new LRUCache<>();
        this.prefix = virtualTopic.getPrefix();
        this.postfix = virtualTopic.getPostfix();
        this.local = virtualTopic.isLocal();
        this.concurrentSend = virtualTopic.isConcurrentSend();
        this.transactedSend = virtualTopic.isTransactedSend();
    }

    public Topic getTopic() {
        return (Topic) this.next;
    }

    @Override // com.primeton.pmq.broker.region.DestinationFilter, com.primeton.pmq.broker.region.Destination
    public void send(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception {
        if (!message.isAdvisory() && (!this.local || message.getBrokerPath() == null)) {
            send(producerBrokerExchange, message, getQueueConsumersWildcard(message.getDestination()));
        }
        super.send(producerBrokerExchange, message);
    }

    @Override // com.primeton.pmq.broker.region.DestinationFilter
    protected void send(final ProducerBrokerExchange producerBrokerExchange, final Message message, PMQDestination pMQDestination) throws Exception {
        Broker broker = producerBrokerExchange.getConnectionContext().getBroker();
        Set<Destination> destinations = broker.getDestinations(pMQDestination);
        int size = destinations.size();
        LocalTransactionId beginLocalTransaction = beginLocalTransaction(size, producerBrokerExchange.getConnectionContext(), message);
        try {
            if (!this.concurrentSend || size <= 1) {
                for (Destination destination : destinations) {
                    if (shouldDispatch(broker, message, destination)) {
                        destination.send(producerBrokerExchange, copy(message, destination.getPMQDestination()));
                    }
                }
            } else {
                final CountDownLatch countDownLatch = new CountDownLatch(destinations.size());
                final AtomicReference atomicReference = new AtomicReference();
                BrokerService brokerService = broker.getBrokerService();
                for (final Destination destination2 : destinations) {
                    if (shouldDispatch(broker, message, destination2)) {
                        brokerService.getTaskRunnerFactory().execute(new Runnable() { // from class: com.primeton.pmq.broker.region.virtual.VirtualTopicInterceptor.1
                            @Override // java.lang.Runnable
                            public void run() {
                                try {
                                    if (atomicReference.get() == null) {
                                        destination2.send(producerBrokerExchange, VirtualTopicInterceptor.this.copy(message, destination2.getPMQDestination()));
                                    }
                                } catch (Exception e) {
                                    atomicReference.set(e);
                                } finally {
                                    countDownLatch.countDown();
                                }
                            }
                        });
                    } else {
                        countDownLatch.countDown();
                    }
                }
                countDownLatch.await();
                if (atomicReference.get() != null) {
                    throw ((Exception) atomicReference.get());
                }
            }
        } finally {
            commit(beginLocalTransaction, producerBrokerExchange.getConnectionContext(), message);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Message copy(Message message, PMQDestination pMQDestination) {
        Message copy = message.copy();
        copy.setDestination(pMQDestination);
        copy.setOriginalDestination(message.getDestination());
        return copy;
    }

    private LocalTransactionId beginLocalTransaction(int i, ConnectionContext connectionContext, Message message) throws Exception {
        LocalTransactionId localTransactionId = null;
        if (this.transactedSend && i > 1 && message.isPersistent() && message.getTransactionId() == null) {
            localTransactionId = new LocalTransactionId(new ConnectionId(message.getMessageId().getProducerId().toString()), message.getMessageId().getProducerSequenceId());
            connectionContext.getBroker().beginTransaction(connectionContext, localTransactionId);
            connectionContext.setTransaction(connectionContext.getTransactions().get(localTransactionId));
            message.setTransactionId(localTransactionId);
        }
        return localTransactionId;
    }

    private void commit(LocalTransactionId localTransactionId, ConnectionContext connectionContext, Message message) throws Exception {
        if (localTransactionId != null) {
            connectionContext.getBroker().commitTransaction(connectionContext, localTransactionId, true);
            connectionContext.getTransactions().remove(localTransactionId);
            connectionContext.setTransaction(null);
            message.setTransactionId(null);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean shouldDispatch(Broker broker, Message message, Destination destination) throws IOException {
        if (!this.prefix.contains(".*") || this.prefix.startsWith("*")) {
            return true;
        }
        return destination.getName().startsWith(this.prefix.substring(0, this.prefix.indexOf(".*")));
    }

    protected PMQDestination getQueueConsumersWildcard(PMQDestination pMQDestination) {
        PMQQueue pMQQueue;
        synchronized (this.cache) {
            pMQQueue = this.cache.get(pMQDestination);
            if (pMQQueue == null) {
                pMQQueue = new PMQQueue(this.prefix + pMQDestination.getPhysicalName() + this.postfix);
                this.cache.put(pMQDestination, pMQQueue);
            }
        }
        return pMQQueue;
    }
}
