stasher::manager->manage_subscription(): a managed subscription

#include <iostream>
#include <stasher/client.H>
#include <stasher/manager.H>
#include <stasher/managedsubscriber.H>
#include <x/mpobj.H>

#include <queue>

class mySubscriberObj : public stasher::managedsubscriberObj {

public:
	mySubscriberObj() {}
	~mySubscriberObj() noexcept
	{
	}

	typedef x::mpcobj<std::queue<std::string> > queue_t;

	queue_t queue;

	// Invoked when the connection gets established, or breaks.

	void connection_update(stasher::req_stat_t status) override
	{
		std::cout << ("Connection update: " + x::tostring(status)
			      + "\n") << std::flush;
	}

	void updated(const std::string &objname,
		     const std::string &suffix) override
	{
		queue_t::lock lock(queue);

		lock->push(objname+suffix);

		lock.notify_one();
	}

	std::string get()
	{
		queue_t::lock lock(queue);

		while (lock->empty())
			lock.wait();

		std::string s=lock->front();

		lock->pop();
		return s;
	}
};

void simplesubscribe(int argc, char **argv)
{
	if (argc < 2)
		throw EXCEPTION("Usage: simplesubscribe {object}+");

	stasher::client client=stasher::client::base::connect();

	auto manager=stasher::manager::create(L"", "10 seconds");

	auto subscriber=x::ref<mySubscriberObj>::create();

	std::list<x::ref<x::obj> > mcguffins;

	for (int i=1; i<argc; ++i)
	{
		mcguffins.push_back(manager->manage_subscription(client,
								 argv[i],
								 subscriber));
	}

	while (1)
	{
		std::string object=subscriber->get();

		stasher::client::base::getreq req
			=stasher::client::base::getreq::create();

		req->objects.insert(object);
		req->openobjects=true;

		stasher::contents contents=client->get(req)->objects;

		if (!contents->succeeded)
			throw EXCEPTION(contents->errmsg);

		auto iter=contents->find(object);

		if (iter == contents->end())
		{
			std::cout << object << " removed" << std::endl;
			continue;
		}

		std::string line;

		std::getline(*iter->second.fd->getistream(), line);

		if (line == "stop")
			break;
		std::cout << object << ": " << line << std::endl;
	}
}

int main(int argc, char **argv)
{
	try {
		simplesubscribe(argc, argv);
	} catch (const x::exception &e)
	{
		std::cerr << e << std::endl;
		exit(1);
	}

	return 0;
}

A stasher::client->subscribe() gets cancelled if the client connection to the server drops, for some reason. An application needs to detect that using the subscription's cancellation mcguffin, and make periodic resubscription attempts. As described in the section called “A simple C++ client: getting and modifying objects”, when a connection with the stasher server breaks, the next request results in an automatic attempt to reconnect with the server.

The manager object takes care of that. When a managed subscriber's connection breaks, the manager object makes periodic attempts to resubscribe. To use a managed subscription, subclass stasher::managedsubscriberObj instead of stasher::client::base::subscriberObj, and implement updated() and connection_update(), then invoke stasher::manager->manage_subscription(). The first parameter to manage_subscription() is the client connection handle, the second parameter is the name of an object or a hierarchy to subscribe to, and the third parameter is an x::ref to an instance of the stasher::managedsubscriberObj subclass.

The same subscriber object can be a callback for multiple subscriptions, or each subscription can have a separate callback object instance. For each subscription, the manager holds a strong reference on the callback object, and its client connection.

There is no explicit un-manage method. manage_subscription() returns a mcguffin. The manager keeps a strong reference on the subscriber callback object, and the client connection handle, as long as the manager object and the mcguffin exists. The manager stops managing the subscription when the last reference to the mcguffin goes out of scope and gets destroyed.

The updated() callback works just like it does with stasher::client->subscribe(). A new method, connection_update(), gets called with a stasher::req_processed_stat when the subscription gets opened. This happens shortly after, or maybe even prior to, manage_subscription() returning to the caller. When then connection with the server disconnects, for any reason, connection_update() gets called with a stasher::req_disconnected_stat. connection_update() gets called with a stasher::req_processed_stat again when the manager reestablishes a subscription after reconnecting with the server. Other status values for connection_update() indicate an error with opening the subscription.

$ ./updatethread fruits/ ""
Connection update: Transaction/request processed
Connection update: Transaction/request processed
pi: 3.1415926
fruits/apple removed
Connection update: Connection to the server failed
Connection update: Connection to the server failed
Connection update: Transaction/request processed
Connection update: Transaction/request processed
fruits/grape: juicy

This example subscribes to two hierarchies, (the top level hierarchy), and the fruits hierarchy (note the trailing /). Both of them log a connection_update(stasher::req_processed_stat). After both subscriptions are opened, one object gets added or updated in the top level hierarchy (pi), and one object gets removed from fruits. The connection with the server drops afterwards connection_update(stasher::req_disconnected_stat), and the connection manager reestablishes it some time later connection_update(stasher::req_processed_stat). After the connection gets reestablished, the fruits/grape objects gets created or updated.

The connection manager tries to reestablish the connection and the subscription periodically. Application may see additional connection_update(stasher::req_disconnected_stat) before a connection_update(stasher::req_processed_stat).

These callbacks have certain limitations, see the section called “What asynchronous C++ API methods can and cannot do” for more information. Each managed subscription counts towards the client's connection limits.

Note

After a subscription gets opened or reopened, stasher::managedsubscriberObj's updated() gets called only to report subsequent changes to the object (if the subscription was for an individual object); or to some object in the hierarchy (if the subscription was for a hierarchy) from that point going forward. Particularly, changes could've occured between a connection_update(stasher::req_disconnected_stat) and a connection_update(stasher::req_processed_stat).

An application should respond to a connection_update(stasher::req_processed_stat) by retrieving the contents of the subscribed object, or the hierarchy, at that time. In the example in this chapter, this can be done by having connection_update(stasher::req_processed_stat) put the subscriber-to object's name into the queue. Note that this requires using a separate, individual subscriber class instance for each object instead of a single class instance for all subscriptions, because each one must know which object it is for. This results in the main execution thread using the same code to check for the object's current value, and refresh it.

But it's easier to use manage_object(), for individual objects, or manage_hierarchymonitor().