stasher::client->subscribe(): object subscriptions

#include <iostream>
#include <stasher/client.H>
#include <x/mpobj.H>

#include <queue>

class mySubscriberObj : public stasher::client::base::subscriberObj {

public:
	mySubscriberObj() {}
	~mySubscriberObj()
	{
	}

	typedef x::mpcobj<std::queue<std::string> > queue_t;

	queue_t queue;

	void updated(const std::string &objname,
		     const std::string &suffix) override
	{
		queue_t::lock lock(queue);

		lock->push(objname+suffix);

		lock.notify_one();
	}

	std::string get()
	{
		queue_t::lock lock(queue);

		while (lock->empty())
			lock.wait();

		std::string s=lock->front();

		lock->pop();
		return s;
	}
};

void simplesubscribe(int argc, char **argv)
{
	if (argc < 2)
		throw EXCEPTION("Usage: simplesubscribe {object}+");

	stasher::client client=stasher::client::base::connect();

	auto subscriber=x::ref<mySubscriberObj>::create();

	std::list<x::ref<x::obj> > mcguffins;

	for (int i=1; i<argc; ++i)
	{
		stasher::subscriberesults
			res=client->subscribe(argv[i], subscriber);

		if (res->status != stasher::req_processed_stat)
			throw EXCEPTION(x::tostring(res->status));

		mcguffins.push_back(res->mcguffin);

		auto cancel_mcguffin=res->cancel_mcguffin; // NOT USED

		std::cout << "Subscribed to " << argv[i] << std::endl;
	}

	while (1)
	{
		std::string object=subscriber->get();

		stasher::client::base::getreq req
			=stasher::client::base::getreq::create();

		req->objects.insert(object);
		req->openobjects=true;

		stasher::contents contents=client->get(req)->objects;

		if (!contents->succeeded)
			throw EXCEPTION(contents->errmsg);

		auto iter=contents->find(object);

		if (iter == contents->end())
		{
			std::cout << object << " removed" << std::endl;
			continue;
		}

		std::string line;

		std::getline(*iter->second.fd->getistream(), line);

		if (line == "stop")
			break;
		std::cout << object << ": " << line << std::endl;
	}
}

int main(int argc, char **argv)
{
	try {
		simplesubscribe(argc, argv);
	} catch (const x::exception &e)
	{
		std::cerr << e << std::endl;
		exit(1);
	}

	return 0;
}

This is an example of a subscription. A subscription provides a notification mechanism when an object in the repository changes. This is done by installing a callback that gets invoked whenever the subscribed object gets created, updated, or deleted (a subscription can get installed for an object that does not exist at the time of the subscription).

It's possible to open more than one subscription, for different objects, at a time. This example subscribes to the list of objects given on the command line. Note, however that the server imposes a maximum limit on the number of opened subscriptions (this is not checked in this example).

The first parameter to stasher::client->subscribe() is the name of the object to open a subscription for. The second parameter is an x::ref to a subclass of stasher::client::base::subscriberObj that implements a updated() method as shown in this example.

After a subscription gets opened, updated() gets invoked every time the object named by the first parameter to subscribe() changes – it gets created, updated, or deleted. Passing an object name with a trailing / subscribes to an entire object hierarchy, with a special notation of referering to the top level hierarchy.

Example: opening a subscription for constants invokes updated() any time the object named constants gets created, updated, or deleted. Opening a subscription for constants/ invokes updated() any time an object named constants/e, constants/pi, or any constants/anything gets created, updated, or deleted. This also includes constants/imaginary/i, but not the constants object itself. Opening a subscription for invokes updated() for any object in the top level hierarchy, like e, pi.

A subscription to a hierarchy counts as one subscription towards the connection limits.

stasher::client->subscribe() returns a stasher::subscriberesults, which is an x::ref to a reference-counted object with the following members:

status

The subscription status status, with stasher::req_processed_stat when the subscription is succesfully opened.

mcguffin

A mcguffin representing the open subscription. There is no formal unsubscribe(), rather than subscription remains open as long as the mcguffin exists. Stopping the subscription involves simply letting the mcguffin go out of scope and get destroyed.

The above example puts each subscription's mcguffin on a list kept on the stack, so that the subscription remains in effect until the simplesubscribe() function returns.

While a subscription remains open, the client object holds a strong reference on the subscriber callback. When the mcguffin goes out of scope and gets destroyed, if the client connection thread is in the middle of invoking the updated() callback, or is busy with something else at the moment, there may be a slight delay before the subscription gets wrapped up, and the connection thread releases its reference on the subscriber callback object; and it's remotely possible that updated() can get invoked at the same time, or just after, the mcguffin goes out of scope and gets destroyed.

cancel_mcguffin

The flip side of the coin. This is a mcguffin that's owned by the client connection thread. When the subscription gets closed, for any reason, the client connection thread releases its reference on the mcguffin.

It's possible that an open subscription can get closed even before its mcguffin goes out of scope and gets destroyed, and the cancellation mcguffin provides the means for detecting this situation. The normal sequence of events when the subscription gets closed goes like this;

The last step also happens spontaneously in the event that the client connection thread's connection to the server breaks for any reason. The subscription does not get reopened automatically, when a new client connection thread reconnects to the stasher server. Attaching a destructor callback to the cancellation mcguffin (and releasing the reference on the cancellation mcguffin, and the stasher::subscriberesults object) provides the means for detecting and handling this situation.

Note

The cancellation mcguffin's destructor callbacks also gets invoked when the subscription gets closed in the regular way, by destroying the subscription mcguffin.

The above example, prints the names of the created/updated/deleted objects until any object's contents are set to a single line with the word "stop".

The updated() callback

updated() receives two std::strings as parameters. The first parameter is always the same as the original parameter to subscribe(). For an open subscription for a single object the second parameter is always an empty string. For an open subscription for an object hierarchym, the second parameter is the name of the object in the subscribed hierarchy that got created, updated, or deleted.

Note that concatenating the two together gets the complete name of the affected object, in all situations. The same subscriber callback can get installed for multiple open subscriptions, and the parameters to updated() indicate which specific object the callback gets invoked for.

The callback gets invoked by the client connection thread, so it should not engage in any lengthy, prolonged activities, nor throw any exceptions. Furthermore, it should not invoke stasher::client methods itself. Because it's the actual client connection thread, the outcome of such a lapse in judgement is typically a thread deadlock. In this example, the callback puts the full name of the object into a queue. The main thread keeps reading the name of each object from the queue, and invokes get() to retrieve the object's contents.

Note

It's possible that the same object gets updated two times in a row, very quickly; and the second update completes before the application has an opportunity to respond to the first callback, and send the request for the object's new contents. The application ends up missing the first update, and gets the value of the object after the second update. The application then responds to the second update notification it receives, invoking the callback; then ends up getting the same value, the object's value after the second update.

Here, an update refers to the object getting created, updated, or deleted. Both the first and the subsequent update included. An application must be prepared to handle this possibility.

Note

The limitations on callback methods described later, in the section called “What asynchronous C++ API methods can and cannot do”, apply to these callbacks too.

Note

Some parts of the etc hierarchy in the root hierarchy namespace, namely etc/done are for stasher's internal use. The updated() callback for this hierarchy gets suppressed internally.