import UpdateMessage from "../models/UpdateMessage";

const { EventHubConsumerClient } = require("@azure/event-hubs");

let client, subscription;

/**
 *
 * @function subscribe:  a function for setting up updates from event hub for all device change messages
 *
 * @param {*} socket to send the results to the front end
 */
const toIoTHub = async (sessionState, callback) => {
  console.log(
    "connecting to event hub",
    sessionState.REACT_APP_EVENT_HUB_CONNECTION_STRING
  );

  if (
    !sessionState?.REACT_APP_EVENT_HUB_CONSUMER_GROUP ||
    !sessionState?.REACT_APP_EVENT_HUB_CONNECTION_STRING
  )
    throw "EventHub session connection string missing";

  client = new EventHubConsumerClient(
    sessionState.REACT_APP_EVENT_HUB_CONSUMER_GROUP,
    sessionState.REACT_APP_EVENT_HUB_CONNECTION_STRING
  );

  // subscribe to get events
  subscription = client.subscribe({
    processEvents: async (events, context) => {
      // filter out non twin patch events
      if (events.length) {
        events.map(m => {
          const message = new UpdateMessage(m);
          if (message) callback(message);
          return m;
        });
        //TODO: remove await?
        await context.updateCheckpoint(events[events.length - 1]);
      }
    },
    processError: async err => {
      // error reporting/handling
      console.log("processError", err);
      // await disconnect();
      // subscribe(socket);
    }
  });
};

// TODO: change to await
const subscribe = async (sessionState, callback) => {
  try {
    if (process.env.REACT_APP_SIGNALR_ENV !== "local")
      await toIoTHub(sessionState, callback);
    // toDeviceTwinChanges(socket);
  } catch (error) {
    console.error(error);
  }
};

/**
 *
 * @function disconnect:  a function for closing event hub connection
 *
 */
const disconnect = async () => {
  console.log("disconnecting from event hub");
  if (subscription) await subscription.close();
  if (client) await client.close();
  return true;
};

export { subscribe, disconnect };
