stasher::versionedptr<classptr>: associate a mcguffin with each object instance

#include <x/ptr.H>
#include <stasher/versionedptr.H>

class inventoryObj;

typedef x::ptr<inventoryObj> inventoryptr;

stasher::versionedptr<inventoryptr> instock;

// ...

instock=inventoryptr::create();

if (instock.null())
{
  // ...
}

instock->method();

inventoryptr p=instock;

// ...

instock.getversion()->addOnDestroy( // ... );

stasher::versionedptr's template parameter is an x::ptr. The stasher::versionedptr constains an instance of the x::ptr. null(), the *() and ->() operators get forwarded to the x::ptr instance.

stasher::versionedptr defines a const x::ptr<classptr> & operator, and an assignment operator that replaces the x::ptr. Additionally, stasher::versionedptr contains a mcguffin object that gets replaced by the assignment operator. Assigning any x::ptr, including a nullptr, to a stasher::versionedptr creates a new mcguffin, which replaces the existing one. getversion() returns the current x::ptr's mcguffin.

This is slightly different than using the object itself, that the x::ptr refers to, as a mcguffin. This is because a mcguffin gets created even for a nullptr. Any assignment operation creates a new mcguffin object, and the stasher::versionedptr releases its reference on the previous mcguffin. This includes replacing a nullptr with another nullptr, too.

Note

The stasher::versionedptr itself employs no locking, and is not thread safe. Locking and mutex exclusion of an stasher::versionedptr must be separately implemented. Conveniently, an stasher::current<classptr> will be happy to do it.

Versioned current objects

stasher::current<classptr> implements managed object callbacks for classptrs, where classptr is an x::ptr to a class. The callbacks construct a new classptr, which is a nullptr if the object was removed from the objet repository. Otherwise, the file descriptor of the object, and its uuid, get passed to the object class's constructor, and the new object instance becomes the classptr. The resulting classptr gets stored in stasher::current's current_value member, which is a mutex protected object that contains value, an instance of the classptr that represents the current value of the object in the object repository.

Note

It's a nullable reference pointer because it can be a nullptr in the event that the object gets removed from the object repository, or if it never existed in the first place. It is also null immediately after the versioned current object gets instantiated, and before the current contents of the object is retrieved from the object repository

The stasher::current<classptr> template class has an optional second template parameter, which defaults to the classptr. If specified, the second template parameter declares the class of current_value.value. Giving a stasher::versionedptr<classptr> for the second template parameter results in a versioned current object. Using a stasher::versionedptr results in each update to the object's value installing an associated mcguffin, replacing the previous value's mcguffin.

The examples in this part use a warehouseObj object which contains a map of individual warehouses objects, keyed by the name of the object in the stasher repository. The value of the map is a versioned current object:

#ifndef warehouses_H
#define warehouses_H

#include <stasher/current.H>
#include <stasher/client.H>
#include <stasher/manager.H>
#include "inventory.H"

class warehousesObj;
typedef x::ref<warehousesObj> warehouses_t;

// A warehouse is a versioned current object.

typedef stasher::current<inventoryptr, stasher::versionedptr<inventoryptr>
			 > warehouse_t;

// All of our warehouses are here.

class warehousesObj : virtual public x::obj {
public:
	warehousesObj() {}
	~warehousesObj() {}

	// Warehouse, by name.

	std::map<std::string, warehouse_t> warehouses;

	// When parsing command line options, create a warehouse dynamically,
	// from the command line list.
	void createwarehouse(const std::string &s)
	{
		if (warehouses.find(s) == warehouses.end())
			warehouses.insert(std::make_pair(s, warehouse_t
							 ::create()));
	}

	void inventory()
	{
		for (auto &warehouse:warehouses)
		{
			std::cout << warehouse.first << ":" << std::endl;

			warehouse_t::base::current_value_t::lock
				lock(warehouse.second->current_value);

			if (lock->value.null())
			{
				std::cout << "    (none)" << std::endl;
				continue;
			}

			std::cout << "    "
				  << std::setw(30) << std::left
				  << "Item"
				  << "   "
				  << std::setw(8) << std::right
				  << "Count" << std::setw(0)
				  << std::endl;

			std::cout << "    "
				  << std::setfill('-') << std::setw(30)
				  << ""
				  << "   "
				  << std::setw(8)
				  << "" << std::setw(0) << std::setfill(' ')
				  << std::endl;

			for (auto &item:lock->value->stock)
			{
				std::cout << "    "
					  << std::setw(30) << std::left
					  << item.first
					  << "   "
					  << std::setw(8) << std::right
					  << item.second << std::setw(0)
					  << std::endl;
			}
			std::cout << std::setw(75) << std::setfill('=') << ""
				  << std::setw(0) << std::setfill(' ')
				  << std::endl;
		}
	}
};

// Create the managed object subscriptions, and wait for the objects to get
// loaded from the database.

void load_warehouses(// Our warehouses
		     const warehouses_t &warehouses,

		     // The client
		     const stasher::client &client,

		     // The connection manager
		     const stasher::manager &manager,

		     // Mcguffins for managed object subscriptions are placed here
		     std::list<x::ref<x::obj> > &mcguffins);

#endif

One situation where a stasher::versionedptr's mcguffin comes in handy is load_warehouses(). This function sets up current objects' subscriptions, and waits until the application receives the initial value of each object. load_warehouses() gets called after the map container of all warehouses gets initialized. load_warehouses() invokes each current object's manage(). Before opening a subscription, a destructor callback gets attached to the version mcguffin for each current object's initial value.

As previously described, the initial value of each object does not get retrieved immediately, so load_warehouses() waits until it happens. Once the current value of each object is retrieved, the new version mcguffin replaces the previous one. When all previous versions of all objects get replaced, the destructor callback itself goes out of scope and gets destroyed, so load_warehouses() only needs to wait until that happens, using a destructor guard. See LIBCXX for more information.

If there was a problem establishing the subscription, the initial value of an object does not get installed. To check for this possibility, load_warehouses() waits until each current object's connection_update() method gets called with anything other that stasher::req_disconnected_stat. If the status is anything other than stasher::req_processed_stat this is a subscription error. Once all subscriptions have a stasher::req_processed_stat, load_warehouses() waits for the destructor callback to go out of scope and gets destroyed, and everything is ready:

#include <x/destroycallbackflag.H>
#include <x/destroycallback.H>
#include "warehouses.H"

void load_warehouses(// Our warehouses
		     const warehouses_t &warehouses,

		     // The client
		     const stasher::client &client,

		     // The connection manager
		     const stasher::manager &manager,

		     // Mcguffins for managed object subscriptions are placed here
		     std::list<x::ref<x::obj> > &mcguffins)
{
	// We'll attach a destructor callback to the warehouses' current
	// version mcguffins.
	//
	// Each warehouse is constructed, and it has a value. It's a null
	// inventoryptr, of course. When the subscription gets opened and
	// managed, the current object in the repository gets loaded, and
	// the warehouse object initialized with that object. So the original
	// null inventoryptr gets replaced, even if the object does not exist
	// in the repository -- in that case a null inventoryptr gets
	// symbolically placed in current_value.value.
	//
	// So, in either case, the version mcguffin of the initial null
	// inventoryptr is going to get destroyed.
	//
	// Attach a dummy destructor callback to each warehouse's initial
	// version mcguffin. Then wait for the dummy destructor to get
	// destroyed. A mcguffin with destructor callbacks keeps a strong
	// reference on the callbacks, until the object gets destroyed, and the
	// callbacks get invoked. All the warehouses get this destructor
	// callback attached to them, which does nothing. When all of them get
	// destroyed, then this destructor mcguffin gets destroyed. So, we wait
	// wait for the destructor callback itself to get destroyed.

	x::destroyCallbackFlag::base::guard guard;

	auto destructor_cb=x::destroyCallback::create();

	for (auto &warehouse:warehouses->warehouses)
	{
		warehouse_t::base::current_value_t::lock
			lock(warehouse.second->current_value);

		lock->value.getversion()->addOnDestroy(destructor_cb);

		mcguffins.push_back(warehouse.second->manage(manager, client,
							     warehouse.first));
	}

	std::cout << "Waiting for objects to get loaded"
		  << std::endl;

	// We want to make sure that the subscriptions were set up. Wait for
	// the status to be anything other than req_disconnected_stat.

	for (auto &warehouse:warehouses->warehouses)
	{
		warehouse_t::base::current_value_t::lock
			lock(warehouse.second->current_value);

		lock.wait([&lock]
			  {
				  return lock->connection_status !=
					  stasher::req_disconnected_stat;
			  });

		if (lock->connection_status != stasher::req_processed_stat)
			// Some kind of a subscription error
			throw EXCEPTION(warehouse.first + ": "
					+ x::tostring(lock->connection_status));
	}

	// Ok, just because everyone connected, doesn't mean that all the
	// objects have been loaded yet.

	guard(destructor_cb);
}

Using version mcguffings

#include <x/destroycallback.H>
#include <x/destroycallbackflag.H>
#include <stasher/client.H>
#include <stasher/manager.H>
#include <stasher/current.H>
#include <stasher/versionedptr.H>
#include <stasher/process_request.H>
#include "inventory.H"
#include "warehouses.H"

#include <sstream>
#include <list>
#include <map>
#include <type_traits>


// Inventory adjustment

// The parameters to adjinventory are groups of three:
// name, what, howmany.
//
// "name" gives the name of an inventoryptr in the object repository.
// "what" is the inventory item's name. "howmany" is how many of "what" to be
// adjusted in the "name" inventory object.
//
// Note that the number of inventory objects, called warehouses here, that
// can be processed is limited by the maximum number of subscriptions, however
// there's no limit (within reason) as to the actual number of adjustments to
// same or different inventory items in different warehouses. The
// same warehouse may have multiple adjustments of the same or different item.
//
// All adjustments get processed at the same time. There is no guarantee of the
// order of adjustments, so some of them may be rejected due to insufficient
// inventory in the warehouse (even if another adjustment was given of the same
// inventory item in the same warehouse, that would've increased the inventory
// level sufficiently).

class adjinfoObj;
typedef x::ref<adjinfoObj> adjinfo;

void do_adjustments(const stasher::client &client,
		    const warehouses_t &warehouses,
		    std::list<adjinfo> &adjs);

// Information about a transfer: the name of the warehouse,
// what it is, and how many of them.

class adjinfoObj : virtual public x::obj {

public:
	std::string name;
	std::string what;
	int howmuch;

	// The processing status gets placed here

	stasher::req_stat_t processed;

	adjinfoObj(const std::string &nameArg,
		   const std::string &whatArg,
		   int howmuchArg) : name(nameArg),
				     what(whatArg), howmuch(howmuchArg)
	{
	}

	~adjinfoObj()
	{
	}

	std::string descr() const
	{
		std::ostringstream o;

		o << "adjust " << what << " by " << howmuch
		  << " in " << name;
		return o.str();
	}
};

void adjinventory(int argc, char **argv)
{
	// The list of transfers parsed from the command line.

	std::list<adjinfo> adjs;

	// All the warehouses elicited from the transfers, combined:
	warehouses_t warehouses=warehouses_t::create();

	// Parse command line options.

	for (int i=1; i+2 < argc; i += 3)
	{
		int n=0;

		std::istringstream(argv[i+2]) >> n;

		auto adjinfo=adjinfo::create(argv[i], argv[i+1], n);

		warehouses->createwarehouse(adjinfo->name);
		adjs.push_back(adjinfo);
	}

	auto client=stasher::client::base::connect();
	auto manager=stasher::manager::create();

	// Load the existing inventory, start subscriptions

	std::list<x::ref<x::obj> > mcguffins;

	load_warehouses(warehouses, client, manager, mcguffins);

	std::cout << "Existing inventory:" << std::endl;

	warehouses->inventory();

	// Perform the transfers. If any of them where req_rejected_stat-ed,
	// repeat them.

	do
	{
		// Submit the transfers, wait for them to get processed.

		do_adjustments(client, warehouses, adjs);

		for (auto b=adjs.begin(), e=adjs.end(), p=b;
		     b != e; )
		{
			p=b;
			++b;

			if ((*p)->processed != stasher::req_rejected_stat)
				adjs.erase(p);
		}
	} while (!adjs.empty());
}

// Helper object to invoke a functor, that takes a stasher::putresults as an
// argument, from a destructor callback.

template<typename Functor> class invokeProcessedFunctorObj
	: public x::destroyCallbackObj {

public:

	Functor f;
	stasher::putresults res;

	template<typename FunctorArg>
	invokeProcessedFunctorObj(FunctorArg && fArg,
				  const stasher::putresults &resArg)
		: f(std::forward<FunctorArg>(fArg)), res(resArg)
	{
	}

	~invokeProcessedFunctorObj()
	{
	}

	void destroyed() noexcept
	{
		f(res);
	}
};

// Lock the inventory object in a warehouse. Pass the inventory object to a
// functor. The functor is expected to return a transactionptr object. If it's
// not null, the functor doesn't want to do a transaction.
//
// Returns a std::pair of the transactionptr object, and the current inventory
// object's version mcguffin.

template<typename Functor>
std::pair<stasher::client::base::transactionptr, x::ref<x::obj> >
make_transaction(const warehouse_t &warehouse, Functor &&f)
{
	warehouse_t::base::current_value_t::lock
		lock(warehouse->current_value);

	return std::make_pair(f((const inventoryptr &)lock->value),
			      lock->value.getversion());
}

// The first functor gets passed to make_transaction. The returned transaction
// is submitted to the object repository. The second functor gets invoked with
// the transaction's stasher::putresults as an argument.
//
// If the transaction fails with a req_rejected_stat, the functor gets called
// only after the original inventory object has already been updated with
// the new value of the inventory object in the object repository. This is done
// by invoking it as part of the destructor callback of the original object's
// version mcguffin. This is an immediate process if the inventory object has
// already been updated, because our reference would be the last one on the
// mcguffin, so its destructor callback will get invoked immediately, after
// the destructor callback gets installed, and everything goes out of scope.


template<typename MakeTransactionFunctor,
	 typename ProcessTransactionFunctor>
void do_adjustment(const stasher::client &client,
		   const warehouse_t &warehouse,
		   MakeTransactionFunctor && make_tran,
		   ProcessTransactionFunctor && process_tran)
{
	auto transaction=make_transaction(warehouse,
					  std::forward<MakeTransactionFunctor>
					  (make_tran));

	if (transaction.first.null())
		return; // The first functor did not want to do it.

	auto mcguffin=transaction.second;

	stasher::process_request(client->put_request(transaction.first),
				 [process_tran, mcguffin]
				 (const stasher::putresults &res)
				 {
					 auto callback=x::ref<
						 invokeProcessedFunctorObj
						 <ProcessTransactionFunctor>

						 >::create(std::move
							   (process_tran),
							   res);

					 if (res->status ==
					     stasher::req_rejected_stat)
					 {
						 mcguffin->addOnDestroy
							 (callback);
						 return;
					 }

					 // Invoke the functor now.

					 callback->destroyed();
				 });
}

// Create a transaction that adjusts the inventory
// This is called by the first functor in do_adjustment().

stasher::client::base::transactionptr
adjust_inventory(const inventoryptr &ptr,
		 const adjinfo &adj)
{
	inventoryptr i;

	if (adj->howmuch == 0)
	{
		std::cout << "Very funny: " + adj->descr() + "\n"
			  << std::flush;
		return stasher::client::base::transactionptr();
	}

	if (ptr.null())
	{
		// No existing inventory object

		if (adj->howmuch < 0)
		{
			std::cout << "Insufficient stock: " + adj->descr()
				+ "\n"
				  << std::flush;
			return stasher::client::base::transactionptr();
		}

		i=inventory::create();
		i->stock[adj->what]=adj->howmuch;
	}
	else
	{
		i=inventoryptr::create(*ptr); // Clone the object

		inventoryObj &newi= *i;

		// Find existing item. Make it 0, if it does not exist.
		auto iter=newi.stock.find(adj->what);

		if (iter == newi.stock.end())
			iter=newi.stock.insert(std::make_pair(adj->what, 0))
				.first;

		if (adj->howmuch < 0 && -iter->second > adj->howmuch)
		{
			std::cout << "Insufficient stock: " + adj->descr()
				+ "\n"
				  << std::flush;
			return stasher::client::base::transactionptr();
		}

		iter->second += adj->howmuch;

		// Delete 0-stocked item. If this is the last item in the
		// inventory, delete the entire object.

		if (iter->second == 0)
			newi.stock.erase(iter);

		if (newi.stock.empty())
		{
			auto tran=stasher::client::base::transaction::create();

			tran->delobj(adj->name, ptr->uuid);
			return tran;
		}
	}

	// Serialize the new inventory object, then insert or update it.

	std::string s;

	typedef std::back_insert_iterator<std::string> insert_iter_t;

	insert_iter_t insert_iter(s);

	x::serialize::iterator<insert_iter_t> ser_iter(insert_iter);

	i->serialize(ser_iter);

	auto tran=stasher::client::base::transaction::create();

	if (ptr.null())
	{
		// No existing object, insert it.

		tran->newobj(adj->name, s);
	}
	else
	{
		// Update it.

		tran->updobj(adj->name, ptr->uuid, s);
	}
	return tran;
}

void do_adjustments(const stasher::client &client,
		    const warehouses_t &warehouses,
		    std::list<adjinfo> &adjs)
{
	// Each functor, below, captures this mcguffin by value.
	// Guard the mcguffin. When it is completely destroyed, it means that
	// all transactions have been processed.

	x::destroyCallbackFlag::base::guard guard;

	x::ref<x::obj> mcguffin=x::ref<x::obj>::create();

	guard(mcguffin);

	for (auto &adj : adjs)
	{
		// Be optimistic.
		adj->processed=stasher::req_processed_stat;

		do_adjustment(client,
			      warehouses->warehouses.find(adj->name)
			      ->second,
			      [adj]
			      (const inventoryptr &ptr)
			      {
				      std::cout << "Adjusting: "
					      + adj->descr() + "\n"
						<< std::flush;
				      return adjust_inventory(ptr, adj);
			      },

			      [adj, mcguffin]
			      (const stasher::putresults &res)
			      {
				      std::cout << x::tostring(res->status)
						+ ": "
						+ adj->descr() + "\n"
						<< std::flush;

				      adj->processed=res->status;
			      });
	}
}

int main(int argc, char **argv)
{
	try {
		adjinventory(argc, argv);
	} catch (const x::exception &e)
	{
		std::cerr << e << std::endl;
		return 1;
	}
	return 0;
}

This is an example of using stasher::versionedptr to properly handle a stasher::req_rejected_stat error. adjinventory.C takes a list of adjustments on the command line. Each adjustment consists of a name of an inventory object, the name of an inventory item, and an adjustment. All adjustments get processed at once, and each adjustment that fails because of a stasher::req_rejected_stat gets tried again.

Here's what happens when there are two inventory objects, east with 10 bananas, and west with 10 apples, and four adjustments: reduce the number of bananas in east by two, add two bananas to west, reduce the number of apples in west by two, add two bananas to east:

$ ./adjinventory east bananas -2 west bananas 2 west apples -2 east apples 2
Waiting for objects to get loaded
Existing inventory:
east:
    Item                                Count
    ------------------------------   --------
    bananas                                10
===========================================================================
west:
    Item                                Count
    ------------------------------   --------
    apples                                 10
===========================================================================
Adjusting: adjust bananas by -2 in east
Adjusting: adjust bananas by 2 in west
Adjusting: adjust apples by -2 in west
Adjusting: adjust apples by 2 in east
Transaction/request processed: adjust bananas by -2 in east
Transaction/request processed: adjust bananas by 2 in west
Collision detected - object uuid mismatch: adjust apples by -2 in west
Collision detected - object uuid mismatch: adjust apples by 2 in east
Adjusting: adjust apples by -2 in west
Adjusting: adjust apples by 2 in east
Transaction/request processed: adjust apples by -2 in west
Transaction/request processed: adjust apples by 2 in east

Here, make_transaction() locks the current object value. In this case, the object is a warehouse object, and the value is an inventory object. make_transaction() retrieves the existing object's version mcguffin from getversion(), then creates a transaction that updates this object.

Note

Applications should not modify stasher::current objects. As shown here, applications should create transactions and process them, then leave it up to the managed stasher::current object to update itself automatically, to reflect the updated object in the stasher object repository. Specifically, the requirements are:

  • Invoking current object's manage() method, to open a subscription. The manager object and the subscription mcguffin must remain in scope for the subscription to be active.

  • The versioned object, if it exists and is not a nullptr presumably holds the uuid of the repository's object that it was constructed from.

  • The object's version mcguffin, and the transaction updating that object get used together.

Only under these circumstances, a contract exists in the event that the transaction fails with a stasher::req_rejected_stat that the versioned object is going to get updated by the managed object subscription, and the version mcguffin will go out of scope and get destroyed. The stasher::req_rejected_stat fail must be because the object in the repository was simultaneously updated, but the application has not yet received the updated object, and it's on the way.

do_adjustment() calls make_transaction() to prepare each transaction, saves the original object's version mcguffin, then processes the transaction, and invokes the ProcessTransactionFunctor with the transaction's results. If the result was a stasher::req_rejected_stat the functor gets invoked from the destructor callback, otherwise it gets invoked immediately.

For simplicity, do_adjustment() always constructs a destructor callback that invokes the functor, but actually installs the destructor callback on the version mcguffin only in the event of a stasher::req_rejected_stat; and invokes it itself in all other cases.

The end result is that the functor that processes the results of the transaction gets invoked with a stasher::req_rejected_stat only after the original object got updated from the repository by an updated object value, in the event of an update collission. do_adjustments() takes a list of all the adjustments specified on the command line, and starts asynchronous transactions for all of them, at the same time. As shown in the above example, when multiple transactions affect the same object, they're likely to collide (the actual output can vary, under heavy client or server load, due to the natural variations that are inherent to multithreaded applications, but the shown output is the typical result).

Finally, adjinventory collects the results of all transaction, and repeats the ones that failed with a stasher::req_rejected_stat.