Distributed PostgreSQL as an extension

Overview

Citus Banner

Slack Status Latest Docs Code Coverage

What is Citus?

Citus is a PostgreSQL extension that transforms Postgres into a distributed database—so you can achieve high performance at any scale.

With Citus, you extend your PostgreSQL database with new superpowers:

  • Distributed tables are sharded across a cluster of PostgreSQL nodes to combine their CPU, memory, storage and I/O capacity.
  • References tables are replicated to all nodes for joins and foreign keys from distributed tables and maximum read performance.
  • Distributed query engine routes and parallelizes SELECT, DML, and other operations on distributed tables across the cluster.
  • Columnar storage compresses data, speeds up scans, and supports fast projections, both on regular and distributed tables.

You can use these Citus superpowers to make your Postgres database scale-out ready on a single Citus node. Or you can build a large cluster capable of handling high transaction throughputs, especially in multi-tenant apps, run fast analytical queries, and process large amounts of time series or IoT data for real-time analytics. When your data size and volume grow, you can easily add more worker nodes to the cluster and rebalance the shards.

Our SIGMOD '21 paper Citus: Distributed PostgreSQL for Data-Intensive Applications gives a more detailed look into what Citus is, how it works, and why it works that way.

Citus scales out from a single node

Since Citus is an extension to Postgres, you can use Citus with the latest Postgres versions. And Citus works seamlessly with the PostgreSQL tools and extensions you are already familiar with.

Why Citus?

Developers choose Citus for two reasons:

  1. Your application is outgrowing a single PostgreSQL node

If the size and volume of your data increases over time, you may start seeing any number of performance and scalability problems on a single PostgreSQL node. For example: High CPU utilization and I/O wait times slow down your queries, SQL queries return out of memory errors, autovacuum cannot keep up and increases table bloat, etc.

With Citus you can distribute and optionally compress your tables to always have enough memory, CPU, and I/O capacity to achieve high performance at scale. The distributed query engine can efficiently route transactions across the cluster, while parallelizing analytical queries and batch operations across all cores. Moreover, you can still use the PostgreSQL features and tools you know and love.

  1. PostgreSQL can do things other systems can’t

There are many data processing systems that are built to scale out, but few have as many powerful capabilities as PostgreSQL, including: Advanced joins and subqueries, user-defined functions, update/delete/upsert, constraints and foreign keys, powerful extensions (e.g. PostGIS, HyperLogLog), many types of indexes, time-partitioning, and sophisticated JSON support.

Citus makes PostgreSQL’s most powerful capabilities work at any scale, allowing you to handle complex data-intensive workloads on a single database system.

Getting Started

The quickest way to get started with Citus is to use the Hyperscale (Citus) deployment option in the Azure Database for PostgreSQL managed service—or set up Citus locally.

Hyperscale (Citus) on Azure Database for PostgreSQL

You can get a fully-managed Citus cluster in minutes through the Hyperscale (Citus) deployment option in the Azure Database for PostgreSQL portal. Azure will manage your backups, high availability through auto-failover, software updates, monitoring, and more for all of your servers. To get started with Hyperscale (Citus), use the Hyperscale (Citus) Quickstart in the Azure docs.

Running Citus using Docker

The smallest possible Citus cluster is a single PostgreSQL node with the Citus extension, which means you can try out Citus by running a single Docker container.

# run PostgreSQL with Citus on port 5500
docker run -d --name citus -p 5500:5432 -e POSTGRES_PASSWORD=mypassword citusdata/citus

# connect using psql within the Docker container
docker exec -it citus psql -U postgres

# or, connect using local psql
psql -U postgres -d postgres -h localhost -p 5500

Install Citus locally

If you already have a local PostgreSQL installation, the easiest way to install Citus is to use our packaging repo

Install packages on Ubuntu / Debian:

curl https://install.citusdata.com/community/deb.sh > add-citus-repo.sh
sudo bash add-citus-repo.sh
sudo apt-get -y install postgresql-14-citus-10.2

Install packages on CentOS / Fedora / Red Hat:

curl https://install.citusdata.com/community/rpm.sh > add-citus-repo.sh
sudo bash add-citus-repo.sh
sudo yum install -y citus102_14

To add Citus to your local PostgreSQL database, add the following to postgresql.conf:

shared_preload_libraries = 'citus'

After restarting PostgreSQL, connect using psql and run:

CREATE EXTENSION citus;

You’re now ready to get started and use Citus tables on a single node.

Install Citus on multiple nodes

If you want to set up a multi-node cluster, you can also set up additional PostgreSQL nodes with the Citus extensions and add them to form a Citus cluster:

-- before adding the first worker node, tell future worker nodes how to reach the coordinator
-- SELECT citus_set_coordinator_host('10.0.0.1', 5432);

-- add worker nodes
SELECT citus_add_node('10.0.0.2', 5432);
SELECT citus_add_node('10.0.0.3', 5432);

-- rebalance the shards over the new worker nodes
SELECT rebalance_table_shards();

For more details, see our documentation on how to set up a multi-node Citus cluster on various operating systems.

Using Citus

Once you have your Citus cluster, you can start creating distributed tables, reference tables and use columnar storage.

Creating Distributed Tables

The create_distributed_table UDF will transparently shard your table locally or across the worker nodes:

CREATE TABLE events (
  device_id bigint,
  event_id bigserial,
  event_time timestamptz default now(),
  data jsonb not null,
  PRIMARY KEY (device_id, event_id)
);

-- distribute the events table across shards placed locally or on the worker nodes
SELECT create_distributed_table('events', 'device_id');

After this operation, queries for a specific device ID will be efficiently routed to a single worker node, while queries across device IDs will be parallelized across the cluster.

-- insert some events
INSERT INTO events (device_id, data)
SELECT s % 100, ('{"measurement":'||random()||'}')::jsonb FROM generate_series(1,1000000) s;

-- get the last 3 events for device 1, routed to a single node
SELECT * FROM events WHERE device_id = 1 ORDER BY event_time DESC, event_id DESC LIMIT 3;
┌───────────┬──────────┬───────────────────────────────┬───────────────────────────────────────┐
│ device_id │ event_id │          event_time           │                 data                  │
├───────────┼──────────┼───────────────────────────────┼───────────────────────────────────────┤
│         119999012021-03-04 16:00:31.189963+00 │ {"measurement": 0.88722643925054}     │
│         119998012021-03-04 16:00:31.189963+00 │ {"measurement": 0.6512231304621992}   │
│         119997012021-03-04 16:00:31.189963+00 │ {"measurement": 0.019368766051897524} │
└───────────┴──────────┴───────────────────────────────┴───────────────────────────────────────┘
(3 rows)

Time: 4.588 ms

-- explain plan for a query that is parallelized across shards, which shows the plan for
-- a query one of the shards and how the aggregation across shards is done
EXPLAIN (VERBOSE ON) SELECT count(*) FROM events;
┌────────────────────────────────────────────────────────────────────────────────────┐
│                                     QUERY PLAN                                     │
├────────────────────────────────────────────────────────────────────────────────────┤
│ Aggregate                                                                          │
│   Output: COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)       │
│   ->  Custom Scan (Citus Adaptive)                                                 │
│         ...                                                                        │
│         ->  Task                                                                   │
│               Query: SELECT count(*) AS count FROM events_102008 events WHERE true │
│               Node: host=localhost port=5432 dbname=postgres                       │
│               ->  Aggregate                                                        │
│                     ->  Seq Scan on public.events_102008 events                    │
└────────────────────────────────────────────────────────────────────────────────────┘

Creating Distributed Tables with Co-location

Distributed tables that have the same distribution column can be co-located to enable high performance distributed joins and foreign keys between distributed tables. By default, distributed tables will be co-located based on the type of the distribution column, but you define co-location explicitly with the colocate_with argument in create_distributed_table.

CREATE TABLE devices (
  device_id bigint primary key,
  device_name text,
  device_type_id int
);
CREATE INDEX ON devices (device_type_id);

-- co-locate the devices table with the events table
SELECT create_distributed_table('devices', 'device_id', colocate_with := 'events');

-- insert device metadata
INSERT INTO devices (device_id, device_name, device_type_id)
SELECT s, 'device-'||s, 55 FROM generate_series(0, 99) s;

-- optionally: make sure the application can only insert events for a known device
ALTER TABLE events ADD CONSTRAINT device_id_fk
FOREIGN KEY (device_id) REFERENCES devices (device_id);

-- get the average measurement across all devices of type 55, parallelized across shards
SELECT avg((data->>'measurement')::double precision)
FROM events JOIN devices USING (device_id)
WHERE device_type_id = 55;

┌────────────────────┐
│        avg         │
├────────────────────┤
│ 0.5000191877513974 │
└────────────────────┘
(1 row)

Time: 209.961 ms

Co-location also helps you scale INSERT..SELECT, stored procedures, and distributed transactions.

Creating Reference Tables

When you need fast joins or foreign keys that do not include the distribution column, you can use create_reference_table to replicate a table across all nodes in the cluster.

CREATE TABLE device_types (
  device_type_id int primary key,
  device_type_name text not null unique
);

-- replicate the table across all nodes to enable foreign keys and joins on any column
SELECT create_reference_table('device_types');

-- insert a device type
INSERT INTO device_types (device_type_id, device_type_name) VALUES (55, 'laptop');

-- optionally: make sure the application can only insert devices with known types
ALTER TABLE devices ADD CONSTRAINT device_type_fk
FOREIGN KEY (device_type_id) REFERENCES device_types (device_type_id);

-- get the last 3 events for devices whose type name starts with laptop, parallelized across shards
SELECT device_id, event_time, data->>'measurement' AS value, device_name, device_type_name
FROM events JOIN devices USING (device_id) JOIN device_types USING (device_type_id)
WHERE device_type_name LIKE 'laptop%' ORDER BY event_time DESC LIMIT 3;

┌───────────┬───────────────────────────────┬─────────────────────┬─────────────┬──────────────────┐
│ device_id │          event_time           │        value        │ device_name │ device_type_name │
├───────────┼───────────────────────────────┼─────────────────────┼─────────────┼──────────────────┤
│        602021-03-04 16:00:31.189963+000.28902084163415864 │ device-60   │ laptop           │
│         82021-03-04 16:00:31.189963+000.8723803076285073  │ device-8    │ laptop           │
│        202021-03-04 16:00:31.189963+000.8177634801548557  │ device-20   │ laptop           │
└───────────┴───────────────────────────────┴─────────────────────┴─────────────┴──────────────────┘
(3 rows)

Time: 146.063 ms

Reference tables enable you to scale out complex data models and take full advantage of relational database features.

Creating Tables with Columnar Storage

To use columnar storage in your PostgreSQL database, all you need to do is add USING columnar to your CREATE TABLE statements and your data will be automatically compressed using the columnar access method.

CREATE TABLE events_columnar (
  device_id bigint,
  event_id bigserial,
  event_time timestamptz default now(),
  data jsonb not null
)
USING columnar;

-- insert some data
INSERT INTO events_columnar (device_id, data)
SELECT d, '{"hello":"columnar"}' FROM generate_series(1,10000000) d;

-- create a row-based table to compare
CREATE TABLE events_row AS SELECT * FROM events_columnar;

-- see the huge size difference!
\d+
                                          List of relations
┌────────┬──────────────────────────────┬──────────┬───────┬─────────────┬────────────┬─────────────┐
│ Schema │             Name             │   Type   │ Owner │ Persistence │    Size    │ Description │
├────────┼──────────────────────────────┼──────────┼───────┼─────────────┼────────────┼─────────────┤
│ public │ events_columnar              │ table    │ marco │ permanent   │ 25 MB      │             │
│ public │ events_row                   │ table    │ marco │ permanent   │ 651 MB     │             │
└────────┴──────────────────────────────┴──────────┴───────┴─────────────┴────────────┴─────────────┘
(2 rows)

You can use columnar storage by itself, or in a distributed table to combine the benefits of compression and the distributed query engine.

When using columnar storage, you should only load data in batch using COPY or INSERT..SELECT to achieve good compression. Update, delete, and foreign keys are currently unsupported on columnar tables. However, you can use partitioned tables in which newer partitions use row-based storage, and older partitions are compressed using columnar storage.

To learn more about columnar storage, check out the columnar storage README.

Documentation

If you’re ready to get started with Citus or want to know more, we recommend reading the Citus open source documentation. Or, if you are using Citus on Azure, then the Hyperscale (Citus) documentation is online and available as part of the Azure Database for PostgreSQL docs.

Our Citus docs contain comprehensive use case guides on how to build a multi-tenant SaaS application, real-time analytics dashboard, or work with time series data.

Architecture

A Citus database cluster grows from a single PostgreSQL node into a cluster by adding worker nodes. In a Citus cluster, the original node to which the application connects is referred to as the coordinator node. The Citus coordinator contains both the metadata of distributed tables and reference tables, as well as regular (local) tables, sequences, and other database objects (e.g. foreign tables).

Data in distributed tables is stored in “shards”, which are actually just regular PostgreSQL tables on the worker nodes. When querying a distributed table on the coordinator node, Citus will send regular SQL queries to the worker nodes. That way, all the usual PostgreSQL optimizations and extensions can automatically be used with Citus.

Citus architecture

When you send a query in which all (co-located) distributed tables have the same filter on the distribution column, Citus will automatically detect that and send the whole query to the worker node that stores the data. That way, arbitrarily complex queries are supported with minimal routing overhead, which is especially useful for scaling transactional workloads. If queries do not have a specific filter, each shard is queried in parallel, which is especially useful in analytical workloads. The Citus distributed executor is adaptive and is designed to handle both query types at the same time on the same system under high concurrency, which enables large-scale mixed workloads.

When to use Citus

Citus is uniquely capable of scaling both analytical and transactional workloads with up to petabytes of data. Use cases in which Citus is commonly used:

  • Customer-facing analytics dashboards: Citus enables you to build analytics dashboards that simultaneously ingest and process large amounts of data in the database and give sub-second response times even with a large number of concurrent users.

    The advanced parallel, distributed query engine in Citus combined with PostgreSQL features such as array types, JSONB, lateral joins, and extensions like HyperLogLog and TopN allow you to build responsive analytics dashboards no matter how many customers or how much data you have.

    Example real-time analytics users: Algolia, Heap

  • Time series data: Citus enables you to process and analyze very large amounts of time series data. The biggest Citus clusters store well over a petabyte of time series data and ingest terabytes per day.

    Citus integrates seamlessly with Postgres table partitioning and pg_partman, which can speed up queries and writes on time series tables. You can take advantage of Citus’s parallel, distributed query engine for fast analytical queries, and use the built-in columnar storage to compress old partitions.

    Example users: MixRank, Windows team

  • Software-as-a-service (SaaS) applications: SaaS and other multi-tenant applications need to be able to scale their database as the number of tenants/customers grows. Citus enables you to transparently shard a complex data model by the tenant dimension, so your database can grow along with your business.

    By distributing tables along a tenant ID column and co-locating data for the same tenant, Citus can horizontally scale complex (tenant-scoped) queries, transactions, and foreign key graphs. Reference tables and distributed DDL commands make database management a breeze compared to manual sharding. On top of that, you have a built-in distributed query engine for doing cross-tenant analytics inside the database.

    Example multi-tenant SaaS users: Copper, Salesloft, ConvertFlow

  • Geospatial: Because of the powerful PostGIS extension to Postgres that adds support for geographic objects into Postgres, many people run spatial/GIS applications on top of Postgres. And since spatial location information has become part of our daily life, well, there are more geospatial applications than ever. When your Postgres database needs to scale out to handle an increased workload, Citus is a good fit.

    Example geospatial users: Helsinki Regional Transportation Authority (HSL), MobilityDB.

Need Help?

Contributing

Citus is built on and of open source, and we welcome your contributions. The CONTRIBUTING.md file explains how to get started developing the Citus extension itself and our code quality guidelines.

Stay Connected

  • Twitter: Follow us @citusdata to track the latest posts & updates on what’s happening.
  • Citus Blog: Read our popular Citus Blog for useful & informative posts about PostgreSQL and Citus.
  • Citus Newsletter: Subscribe to our monthly technical Citus Newsletter to get a curated collection of our favorite posts, videos, docs, talks, & other Postgres goodies.
  • Slack: Our Citus Public slack is a good way to stay connected, not just with us but with other Citus users.
  • Sister Blog: Read our Azure Database for PostgreSQL sister blog on Microsoft TechCommunity for posts relating to Postgres (and Citus) on Azure.
  • Videos: Check out this YouTube playlist of some of our favorite Citus videos and demos. If you want to deep dive into how Citus extends PostgreSQL, you might want to check out Marco Slot’s talk at Carnegie Mellon titled Citus: Distributed PostgreSQL as an Extension that was part of Andy Pavlo’s Vaccination Database Talks series at CMUDB.
  • Our other Postgres projects: Our team also works on other awesome PostgreSQL open source extensions & projects, including: pg_cron, HyperLogLog, TopN, pg_auto_failover, activerecord-multi-tenant, and django-multitenant.

Copyright © Citus Data, Inc.

Comments
  • For Review:  Fix 581 and 582

    For Review: Fix 581 and 582

    This Pull Request implements basic support for both #581 and #582. I realize we had previously discussed making this two separate Pull Requests, but the bulk of the work for this is shared in a new function in relay_event_utility.c#AppendShardIdToRelationReferences, and it just didn't make sense to me to try and split that into two separate functions.

    This gets very minimal integration between @ZomboDB and Citus working by allowing these two types of statements to work correctly:

    CREATE INDEX idxchanges ON wikipedia_changes 
       USING zombodb (zdb('wikipedia_changes'::regclass, ctid), zdb(wikipedia_changes.*)) 
       WITH (url='http://localhost:9200/')
    

    and

    SELECT * FROM wikipedia_changes WHERE zdb('wikipedia_changes', ctid) ==> 'mario';
    

    The latter successfully distributes the query to each worker node/shard, and the resulting queries properly plan an Index Scan.

    I've limited the scope of rewriting ::regclass references to only those that are arguments to function calls and further, to only those that are quals of the query. No targetlist, orderby, groupby, etc, support because @ZomboDB doesn't need it.

    It's not exactly clear to me how we should test this in Citus' regression test suite, so I'm open to thoughts there.

    Please let me know what else I should do -- I'm happy to do it.

    Review Tasks

    • [x] Rename struct to ColumnRefWalkerState
    • [x] Change walker name to UpdateWholeRowColumnReferencesWalker
    • [x] Document walker with function comment
    • [x] Add bool walkIsComplete to walker, change to if/else if/else blocks with return walkIsComplete outside all blocks at end
    • [x] Change extension logic to extend the penultimate field string element iff the last element is A_Star
    • [x] Remove AppendShardIdToRowReferences; inline its logic
    • [x] Add an else if case inside of get_variable to detect Citus RTE nodes (using GetRangeTblKind) rather than modifying the else statement
    • [x] Add tests
      • [x] In multi_index_statements.sql, create an expression index (in the CREATE INDEX section) that uses record_ne(rel.*, NULL) on a distributed table. This is a useless index, but will test your deparse logic and uses a record-taking IMMUTABLE function without us having to make one at test-time
      • [x] Toward the end of multi_simple_queries.sql, add a SELECT query that uses tableoid in the WHERE clause. Maybe something silly like tableoid IS NOT NULL. Maybe include it in the targetlist too.
    • [x] Ensure your branch is up-to-date with the lates changes in master (git rebase)
    CLA Signed 
    opened by eeeebbbbrrrr 58
  • Get Citus compiling and passing on PostgreSQL 10

    Get Citus compiling and passing on PostgreSQL 10

    There is still some room for improvement in these changes (in particular, I think a lot of the places we test PG_VERSION_NUM should be abstracted out and cleaned up in all C files), but everything compiles and passes.

    PostgreSQL 10 changes the output of psql's \d, but we use it in so many places (over 170), that I didn't want to create test variants for every file using it. @anarazel suggested rewriting all tests that use it, and I have an SQL alternative to describing a table, but there are still tests which require foreign key, constraint, or index outputs, which I have yet to polyfill.

    So for now (and this is temporary), I've locked the psql version to 9.6 to get the old behavior (though you won't see that here, it's in the citusdata/tools branch this code uses to build on PostgreSQL 10).

    needs review 
    opened by jasonmp85 32
  • Maintenance daemon PANIC after upgrade to 8.0

    Maintenance daemon PANIC after upgrade to 8.0

    Summary: Upgrading our Centos system from citus72_10.x86_64 to citus80_10.x86_64. Our unit tests are multi-process and fairly demanding. After the upgrade, we get intermittent failures, with the cluster going into recovery mode during the tests. Investigation shows the following on the coordinator postgres log:

    2018-11-30 20:58:11.542 UTC [11035] LOG:  starting maintenance daemon on database 4605651 user 10
    2018-11-30 20:58:11.542 UTC [11035] CONTEXT:  Citus maintenance daemon for database 4605651 user 10
    2018-11-30 20:58:13.207 UTC [7188] WARNING:  there is no transaction in progress
    2018-11-30 20:58:13.254 UTC [7101] FATAL:  terminating connection due to administrator command
    2018-11-30 20:58:13.254 UTC [7101] CONTEXT:  Citus maintenance daemon for database 4552444 user 10
    2018-11-30 20:58:13.257 UTC [4706] LOG:  worker process: Citus Maintenance Daemon: 4552444/10 (PID 7101) exited with exit code 1
    2018-11-30 20:58:13.517 UTC [4738] LOG:  checkpoint starting: immediate force wait
    2018-11-30 20:58:13.828 UTC [4738] LOG:  checkpoint complete: wrote 3124 buffers (0.1%); 0 WAL file(s) added, 0 removed, 1 recycled; write=0.221 s, sync=0.083 s, total=0.311 s; sync files=1009, longest=0.007 s, average=0.000 s; distance=21831 kB, estimate=21831 kB
    2018-11-30 21:00:33.793 UTC [11035] PANIC:  stuck spinlock detected at GetBackendDataForProc, transaction/backend_data.c:798
    2018-11-30 21:00:33.793 UTC [11035] CONTEXT:  Citus maintenance daemon for database 4605651 user 10
    2018-11-30 21:00:33.795 UTC [4706] LOG:  worker process: Citus Maintenance Daemon: 4605651/10 (PID 11035) was terminated by signal 6: Aborted
    2018-11-30 21:00:33.795 UTC [4706] LOG:  terminating any other active server processes
    2018-11-30 21:00:33.796 UTC [11272] WARNING:  terminating connection because of crash of another server process
    2018-11-30 21:00:33.796 UTC [11272] DETAIL:  The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory.
    2018-11-30 21:00:33.796 UTC [11272] HINT:  In a moment you should be able to reconnect to the database and repeat your command.
    2018-11-30 21:00:33.796 UTC [10874] WARNING:  terminating connection because of crash of another server process
    2018-11-30 21:00:33.796 UTC [10874] DETAIL:  The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory.
    

    The system is Centos 7, 16 vCPUs, 128G RAM (about 6G in use during tests); single-node system with coordinator and worker on same node (32 shards); Postgres version shows 10.5-1PGDG.rhel7. It doesn't appear to be resource starvation (lsof shows plenty of headroom on open file handles). Running under Citus 7.2 was solid; we may try 7.5 to help segment the issue. The unit tests create all the data required, so this isn't a "data upgrade" issue; occasionally, the entire suite of tests pass.

    opened by cyxgun 30
  • A proposal for Improving the Safety of Extension Upgrades

    A proposal for Improving the Safety of Extension Upgrades

    It being well-known that updates require two parts; loading of the shared-library and updating of the extension; and understanding that these independent operations, despite their independence, require a certain ordering lest Postgres experience some error, or worse, the sufferance of segfault; I do humbly offer it to consideration that this unfortunate potential be reduced through such means as will be briefly outlined.

    Firstly, that the shared-object, the industrious engine of Magick powering Citus, shall be provided some means of determining its version.

    Secondly, Postgres providing no mechanism for the ordering of version numbers, preferring instead to wander through the Graph of versions by means of paths it dynamically constructs, that the Regex \d+\.\d+-\d+ (ex. 6.0-13) shall be strictly enforced and the shared-object taught a total ordering of versions using the obvious piecewise-numeric method.

    Thirdly, that the upgrade-scripts, which currently operate much like Goethe's famous broomstick: without any regard for Expediency, shall be given such self-awareness as required to prevent the aforementioned segfaults, namely: that they shall refuse to run unless the shared-object reports being of a greater or equal version than the script itself.

    I have duly submitted a request-for-pull, recorded here, and hope the remainder of these tasks may be soon addressed.

    Needs Documentation 
    opened by lithp 30
  • Mitmproxy-based automated failure testing

    Mitmproxy-based automated failure testing

    Some slides explaining how this works: https://docs.google.com/presentation/d/1zxF32GvcFJp0s6UClL39hla9WqhbGvYgo30e2WYnADQ/edit?usp=sharing

    A modernized #2044. This one works at the network level instead of the process level. A drawback is: we don't have a good way of exposing timing problems. An example might be, maybe if a packet comes in before the process is ready for it the process crashes. That class of bugs seems rare to me, so I think this approach is safe to move forward with.

    It's also much easier! This doesn't require sudo, just an extra daemon which packets flow through. It also has the advantage of not caring at all how the process runs, so the same tests should work modulo any refactoring which we choose to do.

    Some work left to be done:

    • [x] There's a race condition which causes mitmproxy to sometimes return 'NoneType' object has no attribute 'tell'. What is going wrong?
    • [ ] Should we really be using a fifo? Maybe a socket makes more sense? A hostname:port is much more portable (though we would still need some way of blocking to make sure changes have been made, and would have to rely on a tool such as netcat in order to keep using COPY)
    • [ ] Refactor fluent.py into something more maintainable, the double-queue solution probably isn't optimal.
    • [ ] The Handler and Mixin hierarchy is cute but there's definitely a better way. Separating out the Building and the Handling seems like it'd improve things.
    • [ ] Convert all of our manual tests into automated tests under this framework!
    • [ ] Either pg_regress_multi.pl or fluent.py should redirect the proxy output to a log file
    • [x] How hard would it be to extend this to cancellation testing?
    • [x] Instead of bumping the Citus version, add citus.mitmproxy() to the test functions?
    • [ ] Currently it's only possible to fail one of the workers, should we add a second mitmproxy so we can play with that worker as well?
    • [ ] DROP TABLE crashes if you drop the connection after DROP TABLE IF EXISTS is sent. Open a ticket!
    • [ ] What do we do about test output which includes PIDs?
    • [ ] Profile this and figure out how to improve performance? 6 tests take 10 seconds to run.
    • [x] recorder.dump() returns an empty line, fix that!
    • [ ] Some of these tests could be made faster. Instead of recreating tables with different settings of shard_replication_factor, we can start by using the higher replication factor, then manually set some of the shards inactive.
    • [x] Make failure_drop_table consistent, prepared transactions have random ids, introduce something like enable_unique_job_ids -> enable_unique_transaction_ids
    • [x] Make the failure_task_tracker_executor test reproducible (don't hard-code filepaths)
    opened by lithp 29
  • Real-Time select queries might get stuck and never finish under high load

    Real-Time select queries might get stuck and never finish under high load

    Steps to re-produce (almost every time it happens):

    • 1+3 c4.4xlarge cluster
    • Citus 7.1.4 - Postgres 10.0
    CREATE TYPE complex AS (
        r       double precision,
        i       double precision
    );
    
    SELECT run_command_on_workers('
    CREATE TYPE complex AS (
        r       double precision,
        i       double precision
    );');
    
    CREATE TABLE test_table_1 
    	(key int, 
    	 occurred_at timestamp DEFAULT now(), 
    	 value_1 jsonb, 
    	 value_2 text[], 
    	 value_3 int4range, 
    	 value_4 complex NOT NULL);
    SELECT create_distributed_table('test_table_1', 'key');
    

    fab pg.set_config:max_wal_size,"'5GB'" fab pg.set_config:max_connections,1000

    Load some data with pgbench:

    \set aid  random(1, 100000)
    \set bid  random(1, 100000)
    \set cid  random(1, 100000)
    \set did  random(1, 100000)
    
    INSERT INTO test_table_1 (key, value_1, value_2, value_3, value_4) VALUES 
    						 (:aid,
    						  row_to_json(row(:aid,:bid, row(:cid+10, :did+500,:aid *7), row(:cid-10, :did-500,:aid *71))),
    						  ARRAY[:aid::text, :bid::text, :cid::text, :did::text, 'Onder Kalaci', 'CitusData Rocks']::text[],
    						  int4range(:aid, :aid + :bid),
    						    (:aid, :did)::complex);
    

    Then pgbench command:

    pgbench -f insert_only.sql -P 3 -c128 -j512 -t 1000 -n

    Real-time select pgbench file:

    SELECT * FROM test_table_1 LIMIT 10;
    

    pgbench command: pgbench -f select.sql -P 3 -c32 -j32 -t 512 -n

    At some point, some of the clients blocks with the following backtrace

    #0  0x00007f4aa73bc9d0 in __poll_nocancel () from /lib64/libc.so.6
    #1  0x00007f4a9f74dd77 in MultiClientWait ([email protected]=0x263de98) at executor/multi_client_executor.c:822
    #2  0x00007f4a9f74f343 in MultiRealTimeExecute ([email protected]=0x25d2778) at executor/multi_real_time_executor.c:177
    #3  0x00007f4a9f74e4aa in RealTimeExecScan (node=0x2557f30) at executor/multi_executor.c:274
    #4  0x000000000063178d in ExecCustomScan (pstate=0x2557f30) at nodeCustom.c:118
    #5  0x00000000006239f0 in ExecProcNodeFirst (node=0x2557f30) at execProcnode.c:430
    #6  0x0000000000638570 in ExecProcNode (node=0x2557f30) at ../../../src/include/executor/executor.h:250
    #7  ExecLimit (pstate=0x2557778) at nodeLimit.c:95
    #8  0x00000000006239f0 in ExecProcNodeFirst (node=0x2557778) at execProcnode.c:430
    #9  0x000000000061ec76 in ExecProcNode (node=0x2557778) at ../../../src/include/executor/executor.h:250
    #10 ExecutePlan ([email protected]=0x2557560, planstate=0x2557778, use_parallel_mode=0 '\000', [email protected]=CMD_SELECT, [email protected]=1 '\001', [email protected]=0, 
        [email protected]=ForwardScanDirection, [email protected]=0x26a7d68, [email protected]=1 '\001') at execMain.c:1721
    #11 0x000000000061fb49 in standard_ExecutorRun (queryDesc=0x259aa80, direction=ForwardScanDirection, count=0, execute_once=<optimized out>) at execMain.c:363
    #12 0x000000000061fbc5 in ExecutorRun ([email protected]=0x259aa80, [email protected]=ForwardScanDirection, [email protected]=0, execute_once=<optimized out>) at execMain.c:306
    #13 0x00000000007626b8 in PortalRunSelect ([email protected]=0x2590a60, [email protected]=1 '\001', count=0, [email protected]=9223372036854775807, [email protected]=0x26a7d68) at pquery.c:932
    #14 0x0000000000763cd4 in PortalRun ([email protected]=0x2590a60, [email protected]=9223372036854775807, [email protected]=1 '\001', [email protected]=1 '\001', [email protected]=0x26a7d68, 
        [email protected]=0x26a7d68, [email protected]=0x7fff09b00f30 "") at pquery.c:773
    #15 0x00000000007603e2 in exec_simple_query ([email protected]=0x24ecdc0 "SELECT * FROM test_table_1 LIMIT 10;") at postgres.c:1099
    #16 0x000000000076200a in PostgresMain (argc=<optimized out>, [email protected]=0x246ee28, dbname=0x246ed40 "ec2-user", username=<optimized out>) at postgres.c:4090
    #17 0x00000000006eaad7 in BackendRun ([email protected]=0x24654d0) at postmaster.c:4357
    #18 0x00000000006ec8c5 in BackendStartup ([email protected]=0x24654d0) at postmaster.c:4029
    #19 0x00000000006ecb6e in ServerLoop () at postmaster.c:1753
    #20 0x00000000006ede10 in PostmasterMain ([email protected]=3, [email protected]=0x24228c0) at postmaster.c:1361
    #21 0x000000000066372f in main (argc=3, argv=0x24228c0) at main.c:228
    

    Some random notes:

    • If you use higher -j % -c % with the pgbench, we hit this issue more frequently. We see much more WARNING: could not establish asynchronous connection after 5000 ms and other connection errors. And, some of the clients got stuck with the same backtrace.
    • If you create some indexes, you'd still see the issue, but probably less
    • The same problem happens with coordinator insert ... select with real-time executor:
    
    #0  0x00007fa31a33e9d0 in __poll_nocancel () from /lib64/libc.so.6
    #1  0x00007fa3126cfd77 in MultiClientWait ([email protected]=0x21a2a90) at executor/multi_client_executor.c:822
    #2  0x00007fa3126d1343 in MultiRealTimeExecute ([email protected]=0x20cc6f8) at executor/multi_real_time_executor.c:177
    #3  0x00007fa3126d04aa in RealTimeExecScan (node=0x21200d0) at executor/multi_executor.c:274
    #4  0x000000000063178d in ExecCustomScan (pstate=0x21200d0) at nodeCustom.c:118
    #5  0x00000000006239f0 in ExecProcNodeFirst (node=0x21200d0) at execProcnode.c:430
    #6  0x0000000000638570 in ExecProcNode (node=0x21200d0) at ../../../src/include/executor/executor.h:250
    #7  ExecLimit (pstate=0x211f918) at nodeLimit.c:95
    #8  0x00000000006239f0 in ExecProcNodeFirst (node=0x211f918) at execProcnode.c:430
    #9  0x000000000061ec76 in ExecProcNode (node=0x211f918) at ../../../src/include/executor/executor.h:250
    #10 ExecutePlan ([email protected]=0x211f700, planstate=0x211f918, use_parallel_mode=0 '\000', [email protected]=CMD_SELECT, [email protected]=1 '\001', [email protected]=0, 
        [email protected]=ForwardScanDirection, [email protected]=0x2105ba0, [email protected]=1 '\001') at execMain.c:1721
    #11 0x000000000061fb49 in standard_ExecutorRun (queryDesc=0x209ad90, direction=ForwardScanDirection, count=0, execute_once=<optimized out>) at execMain.c:363
    #12 0x000000000061fbc5 in ExecutorRun ([email protected]=0x209ad90, [email protected]=ForwardScanDirection, [email protected]=0, execute_once=<optimized out>) at execMain.c:306
    #13 0x00000000007626b8 in PortalRunSelect ([email protected]=0x20c0f58, [email protected]=1 '\001', count=0, [email protected]=9223372036854775807, [email protected]=0x2105ba0) at pquery.c:932
    #14 0x0000000000763cd4 in PortalRun ([email protected]=0x20c0f58, [email protected]=9223372036854775807, [email protected]=0 '\000', [email protected]=1 '\001', [email protected]=0x2105ba0, 
        [email protected]=0x2105ba0, [email protected]=0x0) at pquery.c:773
    #15 0x00007fa3126ce9c6 in ExecuteIntoDestReceiver ([email protected]=0x2104268, [email protected]=0x0, [email protected]=0x2105ba0) at executor/insert_select_executor.c:181
    #16 0x00007fa3126ceae8 in ExecuteSelectIntoRelation ([email protected]=16740, [email protected]=0x2105030, [email protected]=0x2104268, 
        [email protected]=0x2098d80) at executor/insert_select_executor.c:140
    #17 0x00007fa3126cebb8 in CoordinatorInsertSelectExecScan (node=0x2098f98) at executor/insert_select_executor.c:75
    #18 0x000000000063178d in ExecCustomScan (pstate=0x2098f98) at nodeCustom.c:118
    #19 0x00000000006239f0 in ExecProcNodeFirst (node=0x2098f98) at execProcnode.c:430
    #20 0x000000000061ec76 in ExecProcNode (node=0x2098f98) at ../../../src/include/executor/executor.h:250
    #21 ExecutePlan ([email protected]=0x2098d80, planstate=0x2098f98, use_parallel_mode=0 '\000', [email protected]=CMD_INSERT, [email protected]=0 '\000', [email protected]=0, 
        [email protected]=ForwardScanDirection, [email protected]=0x2101348, [email protected]=1 '\001') at execMain.c:1721
    #22 0x000000000061fb49 in standard_ExecutorRun (queryDesc=0x1fad6a0, direction=ForwardScanDirection, count=0, [email protected]=1 '\001') at execMain.c:363
    #23 0x000000000061fbc5 in ExecutorRun ([email protected]=0x1fad6a0, [email protected]=ForwardScanDirection, [email protected]=0, [email protected]=1 '\001') at execMain.c:306
    #24 0x0000000000762d45 in ProcessQuery ([email protected]=0x2101028, sourceText=<optimized out>, params=0x0, queryEnv=0x0, [email protected]=0x2101348, [email protected]=0x7ffc6ce39e00 "")
        at pquery.c:161
    #25 0x0000000000762f74 in PortalRunMulti ([email protected]=0x20c0e40, [email protected]=1 '\001', [email protected]=0 '\000', [email protected]=0x2101348, 
        [email protected]=0x2101348, [email protected]=0x7ffc6ce39e00 "") at pquery.c:1286
    #26 0x0000000000763d70 in PortalRun ([email protected]=0x20c0e40, [email protected]=9223372036854775807, [email protected]=1 '\001', [email protected]=1 '\001', [email protected]=0x2101348, 
        [email protected]=0x2101348, [email protected]=0x7ffc6ce39e00 "") at pquery.c:799
    #27 0x00000000007603e2 in exec_simple_query ([email protected]=0x202e1d0 "INSERT INTO test_table_1 SELECT * FROM test_table_1 LIMIT 10;") at postgres.c:1099
    #28 0x000000000076200a in PostgresMain (argc=<optimized out>, [email protected]=0x1fafe28, dbname=0x1fafd40 "ec2-user", username=<optimized out>) at postgres.c:4090
    #29 0x00000000006eaad7 in BackendRun ([email protected]=0x1fa64d0) at postmaster.c:4357
    #30 0x00000000006ec8c5 in BackendStartup ([email protected]=0x1fa64d0) at postmaster.c:4029
    #31 0x00000000006ecb6e in ServerLoop () at postmaster.c:1753
    #32 0x00000000006ede10 in PostmasterMain ([email protected]=3, [email protected]=0x1f638c0) at postmaster.c:1361
    #33 0x000000000066372f in main (argc=3, argv=0x1f638c0) at main.c:228
    

    And we get the following output from pgbench which never finishes:

    [[email protected] ~]$ pgbench -f insert_select.sql  -P 3 -c32 -j32 -t 512
    starting vacuum...ERROR:  relation "pgbench_branches" does not exist
    (ignoring this error and continuing anyway)
    ERROR:  relation "pgbench_tellers" does not exist
    (ignoring this error and continuing anyway)
    ERROR:  relation "pgbench_history" does not exist
    (ignoring this error and continuing anyway)
    end.
    progress: 3.0 s, 214.7 tps, lat 132.501 ms stddev 246.319
    progress: 6.0 s, 224.3 tps, lat 155.069 ms stddev 221.287
    progress: 9.0 s, 222.3 tps, lat 135.148 ms stddev 187.363
    progress: 12.0 s, 219.0 tps, lat 151.494 ms stddev 205.526
    progress: 15.0 s, 222.0 tps, lat 141.065 ms stddev 169.540
    progress: 18.0 s, 222.7 tps, lat 137.910 ms stddev 197.463
    progress: 21.0 s, 222.6 tps, lat 141.703 ms stddev 220.887
    progress: 24.0 s, 224.4 tps, lat 148.118 ms stddev 215.877
    progress: 27.0 s, 224.3 tps, lat 138.928 ms stddev 190.332
    WARNING:  could not establish asynchronous connection after 5000 ms
    progress: 30.0 s, 222.4 tps, lat 149.898 ms stddev 269.491
    progress: 33.0 s, 220.9 tps, lat 142.246 ms stddev 185.783
    progress: 36.0 s, 223.4 tps, lat 147.924 ms stddev 206.710
    progress: 39.0 s, 221.0 tps, lat 144.701 ms stddev 210.550
    progress: 42.0 s, 221.7 tps, lat 141.649 ms stddev 186.612
    progress: 45.0 s, 225.0 tps, lat 140.926 ms stddev 222.316
    progress: 48.0 s, 222.6 tps, lat 146.699 ms stddev 198.950
    progress: 51.0 s, 225.0 tps, lat 143.601 ms stddev 222.813
    progress: 54.0 s, 224.4 tps, lat 136.603 ms stddev 184.196
    progress: 57.0 s, 219.7 tps, lat 145.607 ms stddev 232.662
    progress: 60.0 s, 226.4 tps, lat 135.318 ms stddev 209.489
    progress: 63.0 s, 224.3 tps, lat 137.365 ms stddev 225.198
    progress: 66.0 s, 224.3 tps, lat 132.615 ms stddev 156.285
    progress: 69.0 s, 222.7 tps, lat 115.979 ms stddev 142.710
    progress: 72.0 s, 223.3 tps, lat 71.027 ms stddev 22.280
    
    opened by onderkalaci 28
  • Use CustomScan API for query execution

    Use CustomScan API for query execution

    Reason we're working this:

    • removing the temp table for real-time queries, which is good for efficiency and for running queries on standbys
    • basis for allowing to execute multiple remote queries within one query (i.e. pull to master style queries)
    • fix PREPARE + explain
    • remove code

    Behavioural changes:

    • explain now happens inside the custom scan's explain callback. That's good because it removes a bunch of code and add features. But it also means the explain output changes - we're now getting called "within" the master query for real-time/tt queries, i.e. the remote explain is now where the sequential scan of the remote query used to be. That's hard to avoid without complicating things, and seems to be the right thing to anyway, because we'll soon have multiple distributed queries within one query.
    • execution now happens during the actual execution phase, not earlier. This means we should be able to make pg_stat_statements work (if it doesn't already)
    • no temp tables anymore :) (which means pg matviews are now supported...)

    Todo

    • [x] Verify pg_stat_statements now works (i.e. doesn't show "(temp table creation)" and such, and that the times are meaningful)
    • [x] Verify query execution now happens during the 'execute' phase, not the 'bind' phase as before. A good way to test that is to use pgbench with -M extended, and use log_min_duration_statement=0
    • [x] Do some simple benchmarking with a query returning a large number of rows (e.g. just COPY lineitem TO '/dev/null'), to make sure we're not regressing
    • [x] Concurrent pgbench for quick router queries.
    • [x] Basic testing
    • [x] Make sure we still acquire all the right relation level locks

    Fixes #457 Fixes #372 Fixes #713

    Needs Documentation 
    opened by anarazel 28
  • Initial pass at allowing single-shard transactions

    Initial pass at allowing single-shard transactions

    Presently requires that all modifications within that transaction go to a single shard. Still going to figure out what to do about queries that use other executor types.

    Silently accepts SAVEPOINT commands, but rejects ROLLBACKs by ERRORing, aborting the entire transaction (and sending ABORT to remotes).

    Fixes #505

    opened by jasonmp85 27
  • Evaluate functions on master

    Evaluate functions on master

    Adds support for:

     -- now() evaluated on master and "col = '2016-05-03 ...'" pushed down
    UPDATE tbl SET col = now() WHERE hits_single_shard;
    
    -- stable_func(10) is evaluated on master
    UPDATE tbl SET col = col + stable_func(10) WHERE hits_single_shard;
    
    -- not allowed because we use statement replication
    UPDATE tbl SET col = 10 + stable_func(col) WHERE hits_single_shard;
    
    -- as long as the partition column is IMMUTABLE anything goes
    INSERT INTO tbl VALUES (...);
    
    -- joinTree is also evaluated
    UPDATE tbl SET col = 10 WHERE hits_single_shard AND stamp > now() - interval '1 hour';
    DELETE FROM tbl WHERE hits_single_shard AND stamp < now() - interval '1 day';
    

    Decisions made:

    • Non-constant partition columns are not allowed, the planner must be able to route a query.
    • During INSERT everything, including VOLATILE functions, is allowed.
    • During UPDATE, STABLE functions are allowed. Everything which does't rely on the value of a Var is evaluated before being pushed down to the workers. (Volatile is not allowed because multiple rows could be updated, and Volatile functions have to be run once per row)
    • mutable are not allowed within CASE and COALESCE.
    • both the targetlist and jointree->quals are allowed to have mutable functions
    • functions in the onConflictWhere clause of UPSERTs are not supported.

    If you enable DEBUG4 then some output is logged with the results of function evaluation.

    Fixes #213

    opened by lithp 27
  • Convert various UDFs to new connection API

    Convert various UDFs to new connection API

    Adding list of individual items to keep track.

    • [x] master_append_table_to_shard (Brian) PR: https://github.com/citusdata/citus/pull/1149
    • [x] master_expire_table_cache
    • [x] master_run_on_worker (@anarazel) PR : https://github.com/citusdata/citus/pull/863
    • [x] DropShards (Burak) PR: #1136
    • [x] WorkerCreateShard
    • [x] master_update_shard_statistics (Brian) PR: https://github.com/citusdata/citus/pull/1125
    • [x] master_copy_shard_placement + master_move_shard_placement(enterprise) a related issue: https://github.com/citusdata/shard_rebalancer/issues/49
    • [x] recover_prepared_transactions (@marcocitus) PR : https://github.com/citusdata/citus/pull/1109
    • [x] ReplicateShardToAllWorkers

    We have a number of internal functions that should start using the new connection & transaction management API. Doing so enables transaction safety (correct visibility & no deadlock when used in a transaction block), allows cancellation & rollback, avoids connection leaks, and will allow us to remove redundant code.

    Currently, we have the following low-level functions that open connections for modifying or query shard placements with the callers listed below each function:

    • ExecuteRemoteCommand
      • master_expire_table_cache
      • DropShards (master_apply_delete_command, master_drop_all_shards)
    • ExecuteRemoteQuery
      • master_append_table_to_shard
      • WorkerCreateShard (create_distributed_table, master_create_worker_shards, master_create_empty_shard)
    • WorkerShardStats
      • master_update_shard_statistics
    • ExecuteRemoteQueryOrCommand and ExecuteCommandsInParallelAndStoreResults
      • master_run_on_worker
    • SendCommandListToWorkerInSingleTransaction (optional)
      • master_copy_shard_placement
      • ReplicateShardToAllWorkers (master_add_node, upgrade_to_reference_table)
    • RecoverWorkerTransactions
      • recover_prepared_transactions

    master_run_on_worker should be converted to use the low-level connection API, but transaction safety is not currently in scope.

    SendCommandListToWorkerInSingleTransaction already uses the low-level functions in the new connection API, but for transaction safety we might consider using the placement API from #1079. Alternatively, we can leave these as they are, since they're unlikely to appear in the same transaction as DDL/DML.

    These functions should start using the new connection API according to the following table.

    | | API | user | allowed xact mod level | set xact mod level | failure ok | note | | --- | --- | --- |--- | --- | --- | --- | | master_append_table_to_shard | placement | current | none or data | data | partial | could go through router executor | | master_expire_table_cache | node | table owner | any | | yes | | | master_copy_shard_placement | placement | table owner | none | | no | low priority | | master_run_on_worker | node | current | none | | yes? | should maybe error out on failure | | master_update_shard_statistics | placement | current | any | idle (if not set) | yes | | | recover_prepared_transactions | node | current | any | | yes | does not touch placements | | DropShards | placement | table owner | none, data/multi | data/multi | yes? | should maybe roll back on failure | | WorkerCreateShard | placement | table owner | none, data/multi | data/multi | partial? | should maybe roll back on failure, might break some transactions | | ReplicateShardToAllWorkers | placement | table owner | none | | no | low priority |

    "API" indicates which connection API function should be used. "placement" API means the caller should use GetPlacementConnection. "node" API means the called should use GetNodeUserDatabaseConnection. Operations that create new shards should ensure that the metadata on the master is created prior to performing the remote command.

    "user" indicates as which user the function should connect to workers.

    "allowed xact mod level" indicates which XactModificationLevel can be permitted. We require all transactional operations to check whether they are compatible with the current XactModificationLevel and optionally set the XactModificationLevel. "none" means that the command can only proceed if there was no preceding transactional operation that set XactModificationLevel. "any" means that we don't check it. "data/multi" means that the command can either be compatible with the router executor ("data") or with multi-shard transactions ("multi"), depending on how the function is (re-)implemented.

    An implementation that serially sends commands over a single connection should use "data" whereas an implementation that sends all commands in parallel should use "multi". Once execution completes, the XactModificationLevel should also be set to the appropriate level.

    "set xact mod level" indicates what XactModificationLevel should be set to at the end of the command. No value means we leave it unchanged. "data/multi" should be either data or multi depending on how the function is (re-implemented).

    "failure ok" indicates whether the remote operations are allowed to fail. I included this mainly to see which functions can use the same command execution logic.

    This issue is part of #873.

    opened by marcocitus 25
  • Allow modification commands which don't contain unpure functions

    Allow modification commands which don't contain unpure functions

    Issue by gmcquillan Tuesday May 05, 2015 at 22:53 GMT Originally opened as https://github.com/citusdata/pg_shard/issues/108


    I'm experimenting with using HyperLogLog (HLL) data types in some columns. One problem with these is that they take up quite a lot more space than a BIGINT. pg_shard potentially allays a lot of those issues. The extension that provides the HLL datatype is this one by aggregateknowledge. These two extensions seem complimentary for warehousing purposes.

    Surprisingly, these data types work with sharded tables for most types of reads, but not for writes (see below). When I attempt an update like so:

    update test_hll_shard set users = users||hll_hash_text('foobar') where date = date('2015-01-08');
    

    I get:

    ERROR:  cannot plan sharded modification containing values which are not constants or constant expressions
    

    I can sort of work around this by setting the literal bytes in this field. Which works fine -- but adding HLL values requires a read and a write. Since pg_shard (understandably) doesn't allow for more than a single statement transaction, this leaves my use-case vulnerable to race conditions in multi-writer environments.

    Since this function is available on the workers, and is deterministic based on the value of the existing row and the new HLL value to be added, there shouldn't be any issue with dispatching this expression through to the workers.

    Is there a hard limitation preventing pg_shard from dispatching modifications for non-constant expressions?

    feature 
    opened by citus-github-bot 24
  • Enable ALTER TABLE ... ADD UNIQUE and ADD EXCLUDE.

    Enable ALTER TABLE ... ADD UNIQUE and ADD EXCLUDE.

    DESCRIPTION: Adds support for creating table constraints UNIQUE and EXCLUDE via ALTER TABLE command without client having to specify a name.

    ALTER TABLE ... ADD CONSTRAINT UNIQUE ... ALTER TABLE ... ADD CONSTRAINT EXCLUDE ...

    commands require the client to provide an explicit constraint name. However, in postgres it is possible for clients not to provide a name and let the postgres generate it using the following commands

    ALTER TABLE ... ADD UNIQUE ... ALTER TABLE ... ADD EXCLUDE ...

    This PR enables the same functionality for citus tables.

    opened by emelsimsek 0
  • repos.citusdata not properly mirroring

    repos.citusdata not properly mirroring

    When going to https://repos.citusdata.com/community/ all links within the page point to https://repos.citusdata.com/citusdata/community/ as root path, but they should all be https://repos.citusdata.com/community/ otherwise they 404 I imaging this is due to the directory pathing on packagecloud being https://packagecloud.io/citusdata/community/ and the pathing carrying over to the mirror.

    opened by Rooba 0
  • Introduce citus_job_{list,status} API

    Introduce citus_job_{list,status} API

    citus_job_list() is a simple wrapper UDF that simply shows the records in pg_dist_background_job, and this contains all the rows in the associated table.

    citus_job_status(job_id) is a UDF that returns all the rows in the former UDF, and an extra details column that shows various details on the task.

    TODO:

    • [ ] Add more aggregate functions to citus_job_status
    • [ ] Add some tests, mostly in isolation suite
    • [x] Fix broken multi_extension.out and upgrade_list_citus_objects,out due to new UDF definitions.
    • [ ] Consider an alternative solution where we can report details on all the jobs, not only the job that is supplied in the argument.
    opened by hanefi 0
  • Add NOTICE logs for create_distributed_table

    Add NOTICE logs for create_distributed_table

    Settings

    PostgreSQL version: 14

    Citus version: 11.1

    Coordinator node: 4 vCores / 16 GiB RAM, 512 GiB storage

    Worker nodes: 4 nodes, 16 vCores / 512 GiB RAM, 4096 GiB storage

    Problem description

    When executing the create_distributed_table UDF, no NOTICE logs are shown to the user. This may be OK for small tables, but for some with many partitions and rows this can be annoying and impose difficulties on debugging a possible issue, since there's no feedback on the execution progress.

    How to reproduce

    Execute the following SQL statements:

    CREATE SCHEMA notice;
    
    CREATE TABLE notice.orders (
        id bigint,
        order_time timestamp without time zone NOT NULL,
        region_id bigint NOT NULL
    )
    PARTITION BY RANGE (order_time);
    
    SELECT create_time_partitions(
      table_name         := 'notice.orders',
      partition_interval := '1 day',
      start_from        := now() - '5 days'::interval,
      end_at             := now()
    );
    

    Next, execute the create_distributed_table statement:

    SELECT create_distributed_table('notice.orders', 'region_id');

    The output can be seen next:

    citus=> SELECT create_distributed_table('notice.orders', 'region_id');
     create_distributed_table
    --------------------------
    
    (1 row)
    

    After it, execute the undistribute_table UDF:

    SELECT undistribute_table('notice.orders');

    You will see that this function has many NOTICE logs, improving visibility on the execution progress:

    citus=> SELECT undistribute_table('notice.orders');
    NOTICE:  converting the partitions of notice.orders
    NOTICE:  creating a new table for notice.orders_p2022_12_16
    NOTICE:  moving the data of notice.orders_p2022_12_16
    NOTICE:  dropping the old notice.orders_p2022_12_16
    NOTICE:  renaming the new table to notice.orders_p2022_12_16
    NOTICE:  creating a new table for notice.orders_p2022_12_17
    NOTICE:  moving the data of notice.orders_p2022_12_17
    NOTICE:  dropping the old notice.orders_p2022_12_17
    NOTICE:  renaming the new table to notice.orders_p2022_12_17
    NOTICE:  creating a new table for notice.orders_p2022_12_18
    NOTICE:  moving the data of notice.orders_p2022_12_18
    NOTICE:  dropping the old notice.orders_p2022_12_18
    NOTICE:  renaming the new table to notice.orders_p2022_12_18
    NOTICE:  creating a new table for notice.orders_p2022_12_19
    NOTICE:  moving the data of notice.orders_p2022_12_19
    NOTICE:  dropping the old notice.orders_p2022_12_19
    NOTICE:  renaming the new table to notice.orders_p2022_12_19
    NOTICE:  creating a new table for notice.orders_p2022_12_20
    NOTICE:  moving the data of notice.orders_p2022_12_20
    NOTICE:  dropping the old notice.orders_p2022_12_20
    NOTICE:  renaming the new table to notice.orders_p2022_12_20
    NOTICE:  creating a new table for notice.orders_p2022_12_21
    NOTICE:  moving the data of notice.orders_p2022_12_21
    NOTICE:  dropping the old notice.orders_p2022_12_21
    NOTICE:  renaming the new table to notice.orders_p2022_12_21
    NOTICE:  creating a new table for notice.orders
    NOTICE:  dropping the old notice.orders
    NOTICE:  renaming the new table to notice.orders
     undistribute_table
    --------------------
    
    (1 row)
    
    opened by lucasbfernandes 0
  • Valid SQL queries are failing with the following error:

    Valid SQL queries are failing with the following error: "DETAIL:  Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table".

    Settings

    PostgreSQL version: 14

    Citus version: 11.1

    Coordinator node: 4 vCores / 16 GiB RAM, 512 GiB storage

    Worker nodes: 4 nodes, 16 vCores / 512 GiB RAM, 4096 GiB storage

    Problem description

    A valid query is failing with the following error:

    DETAIL:  Complex subqueries, CTEs and local tables cannot be in the outer part of an outer join with a distributed table

    How to reproduce

    Execute the following SQL statements:

    CREATE SCHEMA join_order;
    
    CREATE TABLE join_order.enterprises (
        id bigint,
        name text
    );
    
    CREATE TABLE join_order.employees (
        id bigint,
        name text,
        enterprise_id bigint,
        department_id bigint
    );
    
    CREATE TABLE join_order.departments (
        id bigint,
        name text,
        cost_center_id bigint
    );
    
    CREATE TABLE join_order.cost_centers (
        id bigint,
        name text
    );
    
    CREATE TABLE join_order.purchase_requests (
        id bigint,
        metadata jsonb,
        request_time timestamp without time zone NOT NULL,
        enterprise_id bigint
    );
    
    SELECT create_distributed_table('join_order.purchase_requests', 'enterprise_id');
    SELECT create_reference_table('join_order.enterprises');
    SELECT create_reference_table('join_order.employees');
    SELECT create_reference_table('join_order.departments');
    SELECT create_reference_table('join_order.cost_centers');
    

    This will create 4 reference tables and one distributed table. Execute the following query to see the error:

    WITH 
        cte1 as (
            SELECT * FROM join_order.enterprises
        ),
        cte2 as (
            SELECT * FROM join_order.employees
        ),
        cte3 as (
            SELECT * FROM join_order.departments 
        ),
        cte4 as (
            SELECT * FROM join_order.cost_centers
        )
    SELECT
        c1.name,
        c2.name,
        c3.name,
        c4.name,
        count(*) as number_of_requests
    FROM cte1 c1
    INNER JOIN cte2 c2 ON c1.id = c2.enterprise_id
    LEFT OUTER JOIN cte3 c3 ON c3.id = c2.department_id
    LEFT OUTER JOIN cte4 c4 ON c4.id = c3.cost_center_id
    RIGHT OUTER JOIN join_order.purchase_requests pr ON pr.enterprise_id = c1.id
    GROUP BY 1, 2, 3, 4;
    

    This query follows the pattern of:

    SELECT <columns> FROM cte1 // enterprises
    Inner Join cte2 // employees
    Left Outer Join cte3 // departments
    Left Outer Join cte4 // cost centers
    Left outer Join distributed1 // purchase requests
    

    It is a valid pattern, with the distributed table being the one in the outer part of the outer join with a CTE. We believe this is happening because of a PostgreSQL internal rewrite of the join order. Is it possible to increase our SQL coverage here?

    opened by lucasbfernandes 0
  • INSERT/SELECT with repartition triggering an out of disk error

    INSERT/SELECT with repartition triggering an out of disk error

    Settings

    PostgreSQL version: 14

    Citus version: 11.1

    Coordinator node: 4 vCores / 16 GiB RAM, 512 GiB storage

    Worker nodes: 4 nodes, 16 vCores / 512 GiB RAM, 4096 GiB storage

    Problem description

    Running an INSERT/SELECT statement to fill a distributed table on schema B with data from another distributed table on schema A triggers an out of disk error, even though there's more than enough disk on workers to store the results of the query. The error can be seen next:

    ERROR:  could not write to file "base/pgsql_tmp/pgsql_tmp4110.60": No space left on device
    CONTEXT:  while executing command on <postgresql_url>:5432
    

    How to reproduce

    Execute the following SQL statements:

    CREATE SCHEMA disk_source;
    
    CREATE TABLE disk_source.requests (
        id bigint,
        metadata jsonb,
        request_time timestamp without time zone NOT NULL,
        department_id bigint NOT NULL
    )
    PARTITION BY RANGE (request_time);
    
    SELECT create_time_partitions(
      table_name         := 'disk_source.requests',
      partition_interval := '1 day',
      start_from        := now() - '4 years'::interval,
      end_at             := now()
    );
    
    SELECT create_distributed_table('disk_source.requests', 'department_id');
    

    This will create the source table. Next, we must feed it with data. For this, create a VM geographically close to the cluster and setup a screen session to execute the next statement: (This will take some hours to finish and will create ~6TB of data, spread evenly on all 4 worker nodes)

    INSERT INTO disk_source.requests
    SELECT
        generate_series(1, 1000000000)::bigint as id,
        '{"guid":"049f2dcf-0046-48ea-b2dc-f593dcbad3d8","isActive":false,"balance":"$1,563.29","picture":"http://placehold.it/32x32","age":37,"eyeColor":"blue","name":"Eula Simmons","email":"[email protected]","phone":"+1 (909) 123123-3289","address":"Address test 123","about":"Lorem cillum pariatur esse ullamco fugiat officia eu nostrud ex nostrud.","registered":"2015-01-15T10:30:53 +02:00","latitude":-72.598788,"longitude":107.073131}' as metadata,
        now() - justify_hours(random() * (interval '4 years')) as request_time,
        floor(random() * 10000 + 1)::bigint as department_id;
    

    After the transaction finishes, execute the next SQL statements:

    CREATE SCHEMA disk_target;
    
    CREATE TABLE disk_target.requests (
        id bigint,
        metadata jsonb,
        request_time timestamp without time zone NOT NULL,
        department_id bigint NOT NULL,
        another_dist_id bigint NOT NULL
    )
    PARTITION BY RANGE (request_time);
    
    SELECT create_time_partitions(
      table_name         := 'disk_target.requests',
      partition_interval := '1 day',
      start_from        := now() - '4 years'::interval,
      end_at             := now()
    );
    
    SELECT create_distributed_table('disk_target.requests', 'another_dist_id');
    

    Using another screen session inside the VM, execute the next statement:

    INSERT INTO disk_target.requests
    SELECT
        dsr.*,
        floor(random() * 10000 + 1)::bigint as another_dist_id
    FROM disk_source.requests dsr;
    

    It will take some hours to fail, and we believe the reason is the amount of storage used by the job cache directory, used to store intermediate results generated in a repartition operation. For this operation we've seen the directory disk usage grow to more than 2TB on each worker node.

    opened by lucasbfernandes 0
Releases(v11.1.5)
Owner
Citus Data
Scalable PostgreSQL
Citus Data
YugabyteDB is a high-performance, cloud-native distributed SQL database that aims to support all PostgreSQL features

YugabyteDB is a high-performance, cloud-native distributed SQL database that aims to support all PostgreSQL features. It is best to fit for cloud-native OLTP (i.e. real-time, business-critical) applications that need absolute data correctness and require at least one of the following: scalability, high tolerance to failures, or globally-distributed deployments.

yugabyte 7.4k Jan 7, 2023
A PostgreSQL extension providing an async networking interface accessible via SQL using a background worker and curl.

pg_net is a PostgreSQL extension exposing a SQL interface for async networking with a focus on scalability and UX.

Supabase 49 Dec 14, 2022
PostgreSQL extension for pgexporter

pgexporter_ext pgexporter_ext is an extension for PostgreSQL to provide additional Prometheus metrics for pgexporter. Features Disk space metrics See

null 4 Apr 13, 2022
Kunlun distributed DBMS is a NewSQL OLTP relational distributed database management system

Kunlun distributed DBMS is a NewSQL OLTP relational distributed database management system. Application developers can use Kunlun to build IT systems that handles terabytes of data, without any effort on their part to implement data sharding, distributed transaction processing, distributed query processing, crash safety, high availability, strong consistency, horizontal scalability. All these powerful features are provided by Kunlun.

zettadb 114 Dec 26, 2022
The official C++ client API for PostgreSQL.

libpqxx Welcome to libpqxx, the C++ API to the PostgreSQL database management system. Home page: http://pqxx.org/development/libpqxx/ Find libpqxx on

Jeroen Vermeulen 718 Jan 3, 2023
A framework to monitor and improve the performance of PostgreSQL using Machine Learning methods.

pg_plan_inspector pg_plan_inspector is being developed as a framework to monitor and improve the performance of PostgreSQL using Machine Learning meth

suzuki hironobu 198 Dec 27, 2022
Prometheus exporter for PostgreSQL

pgexporter pgexporter is a Prometheus exporter for PostgreSQL. pgexporter will connect to one or more PostgreSQL instances and let you monitor their o

null 19 Dec 22, 2022
The PostgreSQL client API in modern C++

C++ client API to PostgreSQL {#mainpage} Dmitigr Pgfe (PostGres FrontEnd, hereinafter referred to as Pgfe) - is a C++ client API to PostgreSQL servers

Dmitry Igrishin 137 Dec 14, 2022
A friendly and lightweight C++ database library for MySQL, PostgreSQL, SQLite and ODBC.

QTL QTL is a C ++ library for accessing SQL databases and currently supports MySQL, SQLite, PostgreSQL and ODBC. QTL is a lightweight library that con

null 173 Dec 12, 2022
C++ client library for PostgreSQL

Welcome to taoPQ taoPQ is a lightweight C++ client library for accessing a PostgreSQL➚ database. It has no dependencies beyond libpq➚, the C applicati

The Art of C++ 232 Dec 22, 2022
Backup / restore solution for PostgreSQL

pgmoneta pgmoneta is a backup / restore solution for PostgreSQL. pgmoneta is named after the Roman Goddess of Memory. Features Full backup Restore Sym

null 41 Dec 22, 2022
recovery postgresql table data by update/delete/rollback/dropcolumn command

recovery postgresql table data by update/delete/rollback/dropcolumn command

RadonDB 6 Aug 4, 2022
pgagroal is a high-performance protocol-native connection pool for PostgreSQL.

pgagroal is a high-performance protocol-native connection pool for PostgreSQL.

Agroal 555 Dec 27, 2022
xxhash functions for PostgreSQL

pg_xxhash PostgreSQL ❤️ xxhash Tested with xxhash 0.8.1 and PostgreSQL 14.1 on Linux and macOS. Think twice before even considering to use it in any s

Igor Hatarist 6 Oct 27, 2022
High-performance time-series aggregation for PostgreSQL

PipelineDB has joined Confluent, read the blog post here. PipelineDB will not have new releases beyond 1.0.0, although critical bugs will still be fix

PipelineDB 2.5k Dec 26, 2022
Reliable PostgreSQL Backup & Restore

pgBackRest Reliable PostgreSQL Backup & Restore Introduction pgBackRest aims to be a reliable, easy-to-use backup and restore solution that can seamle

pgBackRest 1.5k Dec 31, 2022
upstream module that allows nginx to communicate directly with PostgreSQL database.

About ngx_postgres is an upstream module that allows nginx to communicate directly with PostgreSQL database. Configuration directives postgres_server

RekGRpth 1 Apr 29, 2022
Modern cryptography for PostgreSQL using libsodium.

pgsodium pgsodium is an encryption library extension for PostgreSQL using the libsodium library for high level cryptographic algorithms. pgsodium can

Michel Pelletier 386 Dec 23, 2022
Open Source Oracle Compatible PostgreSQL.

IvorySQL is advanced, fully featured, open source Oracle compatible PostgreSQL with a firm commitment to always remain 100% compatible and a Drop-in r

null 420 Dec 28, 2022