import { pause } from '../lib.js';
import { Reporter } from '../reporter/reporter.js';
import { InformationSource } from '../reporter/information_source.js';
import { v4 as uuid } from '../../node_modules/uuid/dist/esm-browser/index.js';
/**
* @callback NotificationHandler
* @param {YoctopusNotification} notification
*/
/**
* Server connection
*/
class ServerConnection extends InformationSource{
/**
* @param {Reporter} reporter
* @param {Object} settings
* @param {URL} settings.url
* @param {number} settings.reconnectTime
*/
constructor(reporter, settings) {
// sanity checking
if (!['ws:', 'wss:'].includes(settings.url.protocol)) {
throw new Error('URL must be of a websocket server');
}
super(reporter, { ...settings, name: settings.url.hostname } );
/** @type {Reporter} */
this.reporter = reporter;
/** @type {number} seconds between (re)connection attempts; 0 = no reconnect */
this.reconnectTime = settings.reconnectTime || 30;
/**
* @protected
* @type {URL}
*/
this.url = settings.url;
/**
* @protected
* @type {Boolean}
*/
this._connected = false;
/**
* @protected
* @type {(WebSocket|null)}
*/
this._socket = null;
/**
* Use this member to toggle auto-reconnect behavior
* @type {Boolean}
*/
this.autoReconnectFlag = this.reconnectTime > 0 ? true : false;
/**
* @protected
* @type {Object<UUID,YoctopusRequest>}
*/
this._requestRegister = {}
/**
* @protected
* @type {Array<Array<Function>>}
*/
this._connectionChangeHandlers = []
this._connect();
}
get connected() {
return this._connected;
}
/**
* @type {NotificationHandler}
* @protected
*/
_notificationHandler(messageObject) {
this.report({
level: 'notice',
msg: 'Unhandled notification ' + messageObject.meta.type
})
}
/** @type {NotificationHandler} */
set notificationHandler(newHandler) {
if (typeof newHandler !== 'function') {
throw new Error('Notification handler must be a function');
}
this._notificationHandler = newHandler;
}
/**
* Connect to web address and auto-reconnect
* @protected
* @async
*/
async _connect() {
const self = this;
const onClose = function (closeEvent) {
self.report({ msg: 'Closed', level: 'notice' });
self._socket.removeEventListener('close', onClose);
self._socket.removeEventListener('message', boundMessageHandler);
self._socket = null;
self._connected = false;
self.emit( 'connection-changed', false );
return self._connect();
}
const boundMessageHandler = this._handleMessage.bind(this);
while (this._connected !== true && this.autoReconnectFlag === true) {
let socket = null;
try {
socket = await ServerConnection.openWebsocket(this.url);
}
catch(err) {
this.report({ msg: 'Failed' });
}
if (socket) {
this.report({ msg: 'Opened', level: 'notice' });
this._socket = socket;
this._connected = true;
this._socket.addEventListener('close', onClose);
this._socket.addEventListener('message', boundMessageHandler);
this.emit( 'connection-changed', true );
break;
}
else {
if (this.reconnectTime > 0) {
await pause(this.reconnectTime);
}
}
}
}
/**
* Open a websocket
* @static
* @param {URL} url
* @returns {Promise<WebSocket>} websocket
*/
static openWebsocket(url) {
let socket = new WebSocket(url.href);
let resolve = null;
let reject = null;
const promise = new Promise(
function (res, rej) {
resolve = res;
reject = rej;
}
)
const removeListeners = function () {
socket.removeEventListener('open', success);
socket.removeEventListener('close', fail);
socket.removeEventListener('error', fail);
}
const success = function (event) {
removeListeners();
resolve(socket);
}
const fail = function () {
removeListeners();
try {
socket.close();
}
catch (err) { }
socket = null;
reject(null);
}
socket.addEventListener('open', success);
socket.addEventListener('close', fail);
socket.addEventListener('error', fail);
return promise;
}
_handleMessage(msgEvent) {
let msgObj = {};
try {
msgObj = JSON.parse(msgEvent.data);
}
catch (err) {
this.report({
level: 'error',
msg: 'Could not parse msg as JSON: ' + msgEvent.message
});
return;
}
if (!msgObj.meta || !msgObj.meta.id) {
this.report({
level: 'error',
msg: 'Malformed message: ' + msgEvent.message
});
}
else if (msgObj.meta.type) {
this._notificationHandler(msgObj);
}
else if (msgObj.errors) {
console.error( 'Error response from server:', msgObj);
this._requestRegister[msgObj.meta.id].reject(msgObj.errors[0].detail);
}
else if (this._requestRegister[msgObj.meta.id]) {
this._requestRegister[msgObj.meta.id].resolve(msgObj);
delete this._requestRegister[msgObj.meta.id];
}
else {
this.report({
level: 'notice',
msg: 'Unhandled response ' + JSON.stringify(msgEvent.message)
});
}
}
/**
* @param {YoctopusRequest} request
*/
request({ method, endpoint, data }) {
if (this._connected !== true) {
throw new Error('ServerConnection not available (yet)');
}
const id = uuid();
const responsePromise = new Promise((resolve,reject) => this._requestRegister[id] = { resolve, reject });
this._socket.send(
JSON.stringify({
meta: {
id,
method,
endpoint
},
data
})
);
return responsePromise;
}
}
export { ServerConnection }