stasher::versioned_put: updating multiple versioned objects

Extending the process of updating a versioned current object to multiple versioned current objects is deceptively simple:

A stasher::req_rejected_stat indicates that at least one of the objects involved in the transaction was updated and is no longer the current object in the repository. When a transaction updates multiple objects, the stasher::req_rejected_stat does not identify which one it was; therefore, it's necessary to wait until any one of them does.

This can be implemented manually by using the onAnyDestroyed() template, as described in LIBCXX's documentation, but the following templates and classes implement this more conveniently.

stasher::versionscollected: a container for current object version mcguffins

#include <stasher/versionscollected.H>

typedef stasher::current<inventoryptr, stasher::versionedptr<inventoryptr>
			 > warehouse_t;

std::pair<stasher::client::base::transaction,
	  stasher::versionscollected>
create_versioned_put(const warehouse_t &a,
		     const warehouse_t &b)

{
    auto transaction=stasher::client::base::transaction::create();
    auto versions=stasher::versionscollected::create();

    warehouse_t::base::current_value_t::lock
         alock(a->current_value),
         block(b->current_value);

    versions->add(alock->value);
    versions->add(block->value);

    // create the transaction here...

    return std::make_pair(transaction, versions);
}

stasher::versionscollected is an x::ref for a container of version mcguffins of objects that form a transaction. The above example takes two versioned current object, and locks their current_values. As noted previously, when locking multiple objects they must get locked in a specific order, in order to avoid deadlocks; but this is outside of the scope of this example, which presumes that the two warehouse_ts are different and always get given in the right order.

After locking the current values of both objects, each current_value.value, which is a stasher::versionedptr, gets passed to stasher::versionscollected->add(). This method invokes stasher::versionedptr's getversion() and stores a weak reference to the version mcguffin in the stasher::versionscollected. stasher::versionscollected does not need to store a strong reference on the version mcguffin. Checking whether the weakly-referenced mcguffin went out of scope and got destroyed is sufficient for the purpose of determining whether the versioned object was updated already.

For convenience create_from() takes a variadic list of stasher::versionedptr as parameters:

stasher::versionscollected versions=
    stasher::versionscollected::base::create_from(alock->value, block->value);

This is equivalent to invoking stasher::versionscollected::create(), then add()ing both stasher::versionedptrs.

A create_fromiter() takes a beginning and an ending input iterator over stasher::versionedptrs, then constructs a new stasher::versionscollected, add()ing version mcguffins from the input sequence.

Using stasher::versioned_put()

xferinventory.C is a modified version of adjinventory.C that updates multiple versioned objects in the same transaction, and shows how to handle stasher::req_rejected_stat with stasher::versioned_put():

#include <x/destroycallback.H>
#include <x/destroycallbackflag.H>
#include <stasher/client.H>
#include <stasher/manager.H>
#include <stasher/current.H>
#include <stasher/versionedptr.H>
#include <stasher/versionedput.H>
#include "inventory.H"
#include "warehouses.H"

#include <sstream>
#include <list>
#include <map>

// Inventory transfer.

// The parameters to xferinventory are groups of four:
// from, to, what, howmany.
//
// "from" and "to" are names of inventoryptr objects.
//
// "what" is the inventory name. "howmany" is how many of "what" to be
// transferred from the "from" inventory object to the "to" one.
//
// Note that the number of inventory objects, called warehouses here, that
// can be processed is limited by the maximum number of subscriptions, however
// there's no limit (within reason) as to the actual number of transfers of
// same or different inventory items between the different warehouses. The
// same warehouse may be a source or a destination of multiple transfers.
//
// All transfers get processed at the same time. There is no guarantee of the
// order of transfers, so some of them may be rejected due to insufficient
// inventory in the "from" warehouse (even if there's another incoming transfer
// of this inventory item into this warehouse from another warehouse, since the
// transfer order is not guaranteed).
//
// This is an example of a versioned put, with an automatic retry in case of
// a collission: all transfers get kicked off at the same time; when the same
// warehouse is involved in multiple transfers, it's likely that multiple
// updates of the same object go out together, and one of them is going to
// get stasher::req_rejected_stat, in which case it's simply tried again.
// The versioned put makes sure that for a req_rejected_stat transaction, at
// least one participating object's version has changed.

// Some forward declarations

class xferinfoObj;
typedef x::ref<xferinfoObj> xferinfo;

int adjust(const inventoryptr &existing,
	   const std::string &objectname,

	   const std::string &what,
	   int howmuch,
	   const stasher::client::base::transaction &transaction);

void do_transfers(const stasher::client &client,
		  const warehouses_t &warehouses,
		  std::list<xferinfo> &transfers);


std::pair<std::string, int> do_adjust(const inventory &existing,
				      const std::string &what,
				      int howmuch);


// Information about a transfer: the name of the warehouses where something
// gets transferred from and to, what it is, and how many of them.

class xferinfoObj : virtual public x::obj {

public:
	std::string from, to;
	std::string what;
	int howmuch;

	// This is set by create_versioned_put if there's not enough
	// inventory to perform this transfer.
	bool isenough;

	// The processing status gets placed here

	stasher::req_stat_t processed;

	xferinfoObj(const std::string &fromArg,
		    const std::string &toArg,
		    const std::string &whatArg,
		    int howmuchArg) : from(fromArg), to(toArg),
				      what(whatArg), howmuch(howmuchArg),
				      isenough(true) // Be optimistic
	{
	}

	~xferinfoObj()
	{
	}

	std::string descr() const
	{
		std::ostringstream o;

		o << "transfer " << howmuch << " " << what
		  << " from " << from << " to " << to;
		return o.str();
	}
};

////////////////////////////////////////////////////////////////////////////
//
// Take our warehouses, and one transfer.
//
// Create a transaction effecting the transfer, and collect the original
// versions of each warehouse's inventory that were used to build the
// transaction that updates both warehouses' inventory objects.

std::pair<stasher::client::base::transaction,
	  stasher::versionscollected>
create_versioned_put(const warehouses_t &warehouses,
		     const xferinfo &xfer)
{
	// Lock the from and the to warehouse.

	// Note: in this example, create_versioned_put() always gets called
	// from the same thread. If this were a multithreaded context, we would
	// have to lock xfer->from and xfer->to in alphabetical order, in
	// order to avoid a potential deadlock against a thread that's doing
	// a transfer in the opposite "direction" (can be the same or a
	// different inventory item).
	//
	// But, since this is a single thread, this is ok. The manager
	// updates each current object one at a time, so there's no
	// possibility of a deadlock.
	//
	// Doing this in alphabetical order would've been ugly. Thankfully,
	// I don't have to do it.

	warehouse_t::base::current_value_t::lock
		from(warehouses->warehouses.find(xfer->from)
		     ->second->current_value);

	warehouse_t::base::current_value_t::lock
		to(warehouses->warehouses.find(xfer->to)
		   ->second->current_value);

	// Create a transaction, and collect the current versions of the
	// objects that go into the transaction.

	auto transaction=stasher::client::base::transaction::create();
	auto versions=stasher::versionscollected::base
		::create_from(from->value, to->value);

	if (adjust(from->value, xfer->from,
		   xfer->what, -xfer->howmuch, transaction) < 0)
	{
		// Not enough in the from inventory

		xfer->isenough=false;
	}
	else
	{
		adjust(to->value, xfer->to,
		       xfer->what, xfer->howmuch, transaction);
	}
	return std::make_pair(transaction, versions);
}

// Apply a transfer to an inventory, and update the transaction object,
// accordingly.

// Returns the new inventory level of the selected item.

int adjust(const inventoryptr &existing,
	   const std::string &objectname,

	   const std::string &what,
	   int howmuch,
	   const stasher::client::base::transaction &transaction)
{
	if (existing.null())
	{
		// New inventory object

		inventory dummy=inventory::create();

		auto result=do_adjust(dummy, what, howmuch);

		if (result.first.size() == 0)
		{
			// Marginal: no inventory before and after. We give up.

			return result.second;
		}

		transaction->newobj(objectname, result.first);
		return result.second;
	}

	auto result=do_adjust(existing, what, howmuch);

	if (result.first.size() == 0) // Empty inventory!
	{
		transaction->delobj(objectname, existing->uuid);
	}
	else
	{
		transaction->updobj(objectname, existing->uuid, result.first);
	}

	return result.second;
}

// Ok, the task is now reduced to taking this inventory object, updating
// the inventory level, and then serializing it back, and returning the
// new level.

std::pair<std::string, int> do_adjust(const inventory &existing,
				      const std::string &what,
				      int howmuch)
{
	// Clone the object

	auto cpy=inventory::create(*existing);

	// Find this object in the inventory map.

	auto iter=cpy->stock.find(what);

	if (iter == cpy->stock.end())
	{
		// Doesn't exist, create it.
		iter=cpy->stock.insert(std::make_pair(what, 0)).first;
	}

	iter->second += howmuch;

	std::pair<std::string, int> ret;

	ret.second=iter->second;

	// Inventory of 0 removes this item from the inventory, completely.

	if (ret.second == 0)
		cpy->stock.erase(iter);

	// Return an empty string if the inventory is empty. This results
	// in the object getting deleted.

	if (!cpy->stock.empty())
	{
		typedef std::back_insert_iterator<std::string> insert_iter_t;

		insert_iter_t insert_iter(ret.first);

		x::serialize::iterator<insert_iter_t> ser_iter(insert_iter);

		cpy->serialize(ser_iter);
	}
	return ret;
}

void xferinventory(int argc, char **argv)
{
	// The list of transfers parsed from the command line.

	std::list<xferinfo> transfers;

	// All the warehouses elicited from the transfers, combined:
	warehouses_t warehouses=warehouses_t::create();

	// Parse command line options.

	for (int i=1; i+3 < argc; i += 4)
	{
		int n=0;

		std::istringstream(argv[i+3]) >> n;

		if (n <= 0)
		{
			std::cerr << "Eh?" << std::endl;
			return;
		}

		auto xferinfo=xferinfo::create(argv[i], argv[i+1],
					       argv[i+2], n);

		warehouses->createwarehouse(xferinfo->from);
		warehouses->createwarehouse(xferinfo->to);
		transfers.push_back(xferinfo);
	}

	auto client=stasher::client::base::connect();
	auto manager=stasher::manager::create();

	// Load the existing inventory, start subscriptions

	std::list<x::ref<x::obj> > mcguffins;

	load_warehouses(warehouses, client, manager, mcguffins);

	std::cout << "Transfering between:" << std::endl;

	warehouses->inventory();

	// Perform the transfers. If any of them where req_rejected_stat-ed,
	// repeat them.

	do
	{
		// Submit the transfers, wait for them to get processed.

		do_transfers(client, warehouses, transfers);

		for (auto b=transfers.begin(), e=transfers.end(), p=b;
		     b != e; )
		{
			p=b;
			++b;

			if ((*p)->processed != stasher::req_rejected_stat)
				transfers.erase(p);
		}
	} while (!transfers.empty());
}

void do_transfers(const stasher::client &client,
		  const warehouses_t &warehouses,
		  std::list<xferinfo> &transfers)
{
	// We could very well call versioned_put() inside the first for loop.
	// But, for this example to demonstrate req_rejected_stat handling,
	// we must go out of our to cause a version conflict.
	//
	// We'll create all transactions before processing them. This way,
	// when there are multiple transactions that hit the same warehouse,
	// there will be a req_rejected_stat.

	class transaction_list {

	public:
		stasher::client::base::transaction tran;
		stasher::versionscollected vers;
		xferinfo xfer;

		transaction_list(const stasher::client::base::transaction
				 &tranArg,
				 const stasher::versionscollected &versArg,
				 const xferinfo &xferArg)
			: tran(tranArg), vers(versArg), xfer(xferArg)
		{
		}
	};

	std::vector<transaction_list> transactions;

	for (auto &transfer : transfers)
	{
		// Be optimistic.
		transfer->processed=stasher::req_processed_stat;

		if (transfer->from == transfer->to)
		{
			std::cout << "Very funny: " << transfer->descr()
				  << std::endl;
			continue;
		}

		std::pair<stasher::client::base::transaction,
			  stasher::versionscollected>
			transaction=create_versioned_put(warehouses, transfer);

		if (!transfer->isenough)
		{
			std::cout << "Insufficient inventory: "
				  << transfer->descr()
				  << std::endl;
			continue;
		}

		transactions.emplace_back(transaction.first, transaction.second,
					  transfer);
	}

	for (auto &t : transactions)
	{
		std::cout << "Processing: " << t.xfer->descr() << std::endl;

		stasher::putresults res=
			stasher::versioned_put(client, t.tran,
					       t.vers);

		std::cout << x::tostring(res->status)
			  << ": "
			  << t.xfer->descr()
			  << std::endl;

		t.xfer->processed=res->status;
	}
}

int main(int argc, char **argv)
{
	try {
		xferinventory(argc, argv);
	} catch (const x::exception &e)
	{
		std::cerr << e << std::endl;
		return 1;
	}
	return 0;
}

xferinventory takes a list of transfers on the command line. Each transfer consists of the name of the from warehouse object, the name of the to warehouse object, the name of an inventory item, and a positive count. xferinventory updates both warehouse objects, adjusting each one's inventory accordingly. xferinventory accepts more than one transfer, and does them all. Here's the output of transferring four bananas from east to west, and four apples from west to east:

$ ./xferinventory east west bananas 4 west east apples 4
Waiting for objects to get loaded
Transfering between:
east:
    Item                                Count
    ------------------------------   --------
    apples                                  2
    bananas                                 8
===========================================================================
west:
    Item                                Count
    ------------------------------   --------
    apples                                  8
    bananas                                 2
===========================================================================
Processing: transfer 4 bananas from east to west
Transaction/request processed: transfer 4 bananas from east to west
Processing: transfer 4 apples from west to east
Collision detected - object uuid mismatch: transfer 4 apples from west to east
Processing: transfer 4 apples from west to east
Transaction/request processed: transfer 4 apples from west to east

In xferinventory.C, create_versioned_put() acquires a lock on both warehouse objects, collects both objects' version mcguffins into a stasher::versionscollected, then prepares a transaction to update those objects, and releases the lock.

do_transfers() processes the transactions. They all get purposefully created in advance, so that all transactions get created for the initial contents of the warehouse objects. stasher::versioned_put() then processes each transaction, one at a time. This guarantees a stasher::req_rejected_stat when the same warehouse is a part of more than one transfer, as in this example.

stasher::versioned_put() takes a client connection handle, a transaction object, and a stasher::versionscollected. This synchronous function waits for the transaction to get processed, then returns its stasher::putresults. Additionally, if the transaction's status is stasher::req_rejected_stat, stasher::versioned_put() waits until at least one of the versioned mcguffins goes out of scope and gets destroyed, indicating that its versioned object has been updated.

The above example simply proceeds and immediately reprocesses all transfers that failed with a stasher::req_rejected_stat, using the updated contents of the warehouse objects as a new starting point, for this go-around.

Note

All current object value locks must be released before calling stasher::versioned_put(). The correct sequence of events is:

  1. Lock all current_values, in a consistent order, to avoid deadlocks.

  2. Collect each value's version mcguffin, into a stasher::versionscollected, and prepare a transaction updating the same objects.

  3. Release current_value locks.

  4. Call stasher::versioned_put().

In the event of the collision, stasher::versioned_put() waits until one of the versioned objects gets updated. The callbacks that update versioned current objects acquire their own lock on their current_value, resulting in a deadlock unless all locks get released prior to a stasher::versioned_put().

stasher::versioned_put_request()

xferinventory2.C is a version of xferinventory.C that demonstrates how to use stasher::versioned_put_request(), an asynchronous version of stasher::versioned_put(). stasher::versioned_put() takes client connection handle, a transaction object, a functor, and a stasher::versionscollected. The client connection handle, the transaction object, and the stasher::versionscollected serve the same purpose as with stasher::versioned_put().

stasher::versioned_put_request() returns immediately. The functor gets invoked, with a stasher::putresults parameter, when the transaction gets processed. Just like with stasher::versioned_put(), if the transaction's status is stasher::req_rejected_stat, the functor does not get invoked until at least one of the versioned mcguffins goes out of scope and gets destroyed, indicating that its versioned object has been updated.

$ ./xferinventory2 east west apples 2 east west bananas 1 west east apples 1 west east bananas 2
Waiting for objects to get loaded
Transfering between:
east:
    Item                                Count
    ------------------------------   --------
    apples                                  6
    bananas                                 4
===========================================================================
west:
    Item                                Count
    ------------------------------   --------
    apples                                  4
    bananas                                 6
===========================================================================
Processing: transfer 2 apples from east to west
Processing: transfer 1 bananas from east to west
Processing: transfer 1 apples from west to east
Processing: transfer 2 bananas from west to east
Transaction/request processed: transfer 2 apples from east to west
Collision detected - object uuid mismatch: transfer 1 apples from west to east
Collision detected - object uuid mismatch: transfer 1 bananas from east to west
Collision detected - object uuid mismatch: transfer 2 bananas from west to east
Processing: transfer 1 bananas from east to west
Processing: transfer 1 apples from west to east
Processing: transfer 2 bananas from west to east
Transaction/request processed: transfer 1 bananas from east to west
Collision detected - object uuid mismatch: transfer 1 apples from west to east
Collision detected - object uuid mismatch: transfer 2 bananas from west to east
Processing: transfer 1 apples from west to east
Processing: transfer 2 bananas from west to east
Transaction/request processed: transfer 1 apples from west to east
Collision detected - object uuid mismatch: transfer 2 bananas from west to east
Processing: transfer 2 bananas from west to east
Transaction/request processed: transfer 2 bananas from west to east

xferinventory2.C does not prepare all the transactions in advance, before processing them. Since they're asynchronous, xferinventory2.C starts them all together, then waits until they've all been processed, before redoing all the stasher::req_rejected_stat failures:

Note

The requirement to release all versioned current object locks, mentioned in xferinventory.C is slightly relaxed due to stasher::versioned_put_request()'s asynchronous nature, as long as the locks are released shortly after stasher::versioned_put_request returns.

Because of that, two convenience templates are also available. stasher::versioned_put_request_from is an alternative version where the last stasher::versionscollected parameter gets replaced by a variadic list of stasher::versionedptrs (or something similar that implements a suitable getversion()); and stasher::versioned_put_request_fromiter where the last stasher::versionscollected parameter gets replaced by a beginning and an ending iterator, over stasher::versionedptrs.

These alternative functions construct a stasher::versionscollected from their custom parameters, then invoke stasher::versioned_put_request().